diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java index 0e1c62906973..61aabdbb1b1f 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java @@ -101,28 +101,6 @@ void execute(final String key, Runnable task) { protected abstract void execute(String key, Deque finalTasks); - /** Cancels every task in the queue assoicated with {@code key}. */ - void cancelQueuedTasks(final String key, Throwable e) { - // TODO(kimkyung-goog): Ensure execute() fails once cancelQueueTasks() has been ever invoked, - // so that no more tasks are scheduled. - synchronized (tasksByKey) { - final Deque tasks = tasksByKey.get(key); - if (tasks == null) { - return; - } - while (!tasks.isEmpty()) { - Runnable task = tasks.poll(); - if (task instanceof CancellableRunnable) { - ((CancellableRunnable) task).cancel(e); - } else { - logger.log( - Level.WARNING, - "Attempted to cancel Runnable that was not CancellableRunnable; ignored."); - } - } - } - } - protected void invokeCallback(final Deque tasks) { // TODO(kimkyung-goog): Check if there is a race when task list becomes empty. Runnable task = tasks.poll(); @@ -130,6 +108,22 @@ protected void invokeCallback(final Deque tasks) { task.run(); } } + } + + private static class AutoExecutor extends SequentialExecutor { + AutoExecutor(Executor executor) { + super(executor); + } + + protected void execute(final String key, final Deque finalTasks) { + executor.execute( + new Runnable() { + @Override + public void run() { + invokeCallbackAndExecuteNext(key, finalTasks); + } + }); + } protected void invokeCallbackAndExecuteNext(final String key, final Deque tasks) { invokeCallback(tasks); @@ -153,22 +147,6 @@ public void run() { } } - private static class AutoExecutor extends SequentialExecutor { - AutoExecutor(Executor executor) { - super(executor); - } - - protected void execute(final String key, final Deque finalTasks) { - executor.execute( - new Runnable() { - @Override - public void run() { - invokeCallbackAndExecuteNext(key, finalTasks); - } - }); - } - } - private static class CallbackExecutor extends SequentialExecutor { CallbackExecutor(Executor executor) { super(executor); @@ -255,4 +233,26 @@ public void run() { }); } } + + /** Cancels every task in the queue assoicated with {@code key}. */ + void cancelQueuedTasks(final String key, Throwable e) { + // TODO(kimkyung-goog): Ensure execute() fails once cancelQueueTasks() has been ever invoked, + // so that no more tasks are scheduled. + synchronized (tasksByKey) { + final Deque tasks = tasksByKey.get(key); + if (tasks == null) { + return; + } + while (!tasks.isEmpty()) { + Runnable task = tasks.poll(); + if (task instanceof CancellableRunnable) { + ((CancellableRunnable) task).cancel(e); + } else { + logger.log( + Level.WARNING, + "Attempted to cancel Runnable that was not CancellableRunnable; ignored."); + } + } + } + } }