From 18aca86a159dd0daf0e6c7334de0d9a70e4bc788 Mon Sep 17 00:00:00 2001 From: Solomon Duskis Date: Thu, 18 Apr 2019 13:19:04 -0400 Subject: [PATCH] Exposing AutoExecutor and CallbackExecutor directly (#4983) --- .../cloud/pubsub/v1/MessageDispatcher.java | 4 +- .../com/google/cloud/pubsub/v1/Publisher.java | 4 +- .../pubsub/v1/SequentialExecutorService.java | 46 +++++++++---------- .../v1/SequentialExecutorServiceTest.java | 16 +++---- 4 files changed, 33 insertions(+), 37 deletions(-) diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index 88d7ee072c22..aabd700bae04 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -62,7 +62,7 @@ class MessageDispatcher { @InternalApi static final Duration PENDING_ACKS_SEND_DELAY = Duration.ofMillis(100); private final Executor executor; - private final SequentialExecutorService sequentialExecutor; + private final SequentialExecutorService.AutoExecutor sequentialExecutor; private final ScheduledExecutorService systemExecutor; private final ApiClock clock; @@ -206,7 +206,7 @@ void sendAckOperations( jobLock = new ReentrantLock(); messagesWaiter = new MessageWaiter(); this.clock = clock; - this.sequentialExecutor = new SequentialExecutorService(executor); + this.sequentialExecutor = new SequentialExecutorService.AutoExecutor(executor); } void start() { diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 51d1cf5d4ed7..555568dfcbe9 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -99,7 +99,7 @@ public class Publisher { private final PublisherStub publisherStub; private final ScheduledExecutorService executor; - private final SequentialExecutorService sequentialExecutor; + private final SequentialExecutorService.CallbackExecutor sequentialExecutor; private final AtomicBoolean shutdown; private final List closeables; private final MessageWaiter messagesWaiter; @@ -127,7 +127,7 @@ private Publisher(Builder builder) throws IOException { messagesBatchLock = new ReentrantLock(); activeAlarm = new AtomicBoolean(false); executor = builder.executorProvider.getExecutor(); - sequentialExecutor = new SequentialExecutorService(executor); + sequentialExecutor = new SequentialExecutorService.CallbackExecutor(executor); if (builder.executorProvider.shouldAutoClose()) { closeables = Collections.singletonList(new ExecutorAsBackgroundResource(executor)); diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java index 06eca64b2b00..892d9f6420c7 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java @@ -42,36 +42,18 @@ interface CancellableRunnable extends Runnable { * be run in parallel. */ final class SequentialExecutorService { - private static final Logger logger = Logger.getLogger(SequentialExecutorService.class.getName()); - private final CallbackExecutor callbackExecutor; - private final AutoExecutor autoExecutor; - - SequentialExecutorService(Executor executor) { - this.callbackExecutor = new CallbackExecutor(executor); - this.autoExecutor = new AutoExecutor(executor); - } - - /** - * Runs asynchronous {@code Callable} tasks sequentially. If one of the tasks fails, other tasks - * with the same key that have not been executed will be cancelled. - */ - ApiFuture submit(final String key, final Callable> callable) { - return callbackExecutor.submit(key, callable); - } - - /** Runs synchronous {@code Runnable} tasks sequentially. */ - void submit(String key, Runnable runnable) { - autoExecutor.execute(key, runnable); + // This class is not directly usable. + private SequentialExecutorService() { } /** - * Internal implementation of SequentialExecutorService. Takes a serial stream of string keys and + * This Executor takes a serial stream of string keys and * {@code Runnable} tasks, and runs the tasks with the same key sequentially. Tasks with the same * key will be run only when its predecessor has been completed while tasks with different keys * can be run in parallel. */ - abstract static class SequentialExecutor { + private abstract static class SequentialExecutor { // Maps keys to tasks. protected final Map> tasksByKey; protected final Executor executor; @@ -81,7 +63,7 @@ private SequentialExecutor(Executor executor) { this.tasksByKey = new HashMap<>(); } - void execute(final String key, Runnable task) { + protected void execute(final String key, Runnable task) { Deque newTasks; synchronized (tasksByKey) { newTasks = tasksByKey.get(key); @@ -110,11 +92,16 @@ protected void invokeCallback(final Deque tasks) { } } - private static class AutoExecutor extends SequentialExecutor { + static class AutoExecutor extends SequentialExecutor { AutoExecutor(Executor executor) { super(executor); } + /** Runs synchronous {@code Runnable} tasks sequentially. */ + void submit(String key, Runnable task) { + super.execute(key, task); + } + @Override protected void execute(final String key, final Deque tasks) { executor.execute( @@ -142,12 +129,21 @@ private void invokeCallbackAndExecuteNext(final String key, final Deque * This method does the following in a chain: * *
    diff --git a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SequentialExecutorServiceTest.java b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SequentialExecutorServiceTest.java index 00ea85533e06..7b35f1ada745 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SequentialExecutorServiceTest.java +++ b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SequentialExecutorServiceTest.java @@ -71,8 +71,8 @@ public void finish() { @Test public void testExecutorRunsNextTaskWhenPrevResponseReceived() throws Exception { - SequentialExecutorService sequentialExecutorService = - new SequentialExecutorService(executorProvider.getExecutor()); + SequentialExecutorService.CallbackExecutor sequentialExecutorService = + new SequentialExecutorService.CallbackExecutor(executorProvider.getExecutor()); AsyncTaskCallable callable1 = new AsyncTaskCallable(); AsyncTaskCallable callable2 = new AsyncTaskCallable(); AsyncTaskCallable callable3 = new AsyncTaskCallable(); @@ -97,8 +97,8 @@ public void testExecutorRunsNextTaskWhenPrevResponseReceived() throws Exception @Test public void testExecutorRunsDifferentKeySimultaneously() throws Exception { - SequentialExecutorService sequentialExecutorService = - new SequentialExecutorService(executorProvider.getExecutor()); + SequentialExecutorService.CallbackExecutor sequentialExecutorService = + new SequentialExecutorService.CallbackExecutor(executorProvider.getExecutor()); AsyncTaskCallable callable1 = new AsyncTaskCallable(); AsyncTaskCallable callable2 = new AsyncTaskCallable(); AsyncTaskCallable callable3 = new AsyncTaskCallable(); @@ -126,8 +126,8 @@ public void testExecutorRunsDifferentKeySimultaneously() throws Exception { @Test public void testExecutorCancelsAllTasksWhenOneFailed() throws Exception { - SequentialExecutorService sequentialExecutorService = - new SequentialExecutorService(executorProvider.getExecutor()); + SequentialExecutorService.CallbackExecutor sequentialExecutorService = + new SequentialExecutorService.CallbackExecutor(executorProvider.getExecutor()); AsyncTaskCallable callable1 = new AsyncTaskCallable(); AsyncTaskCallable callable2 = new AsyncTaskCallable(); AsyncTaskCallable callable3 = new AsyncTaskCallable(); @@ -207,8 +207,8 @@ public void run() { public void SequentialExecutorRunsTasksAutomatically() throws Exception { int numKeys = 100; int numTasks = 100; - SequentialExecutorService sequentialExecutor = - new SequentialExecutorService(executorProvider.getExecutor()); + SequentialExecutorService.AutoExecutor sequentialExecutor = + new SequentialExecutorService.AutoExecutor(executorProvider.getExecutor()); CountDownLatch remainingTasksCount = new CountDownLatch(numKeys * numTasks); // Maps keys to lists of started and completed tasks. Map> startedTasks = new HashMap<>();