Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More refactoring to SequentialExecutor #4984

Merged
merged 1 commit into from
Apr 19, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.BetaApi;
import com.google.api.core.SettableApiFuture;
import java.util.Deque;
import java.util.HashMap;
Expand All @@ -44,14 +45,12 @@ interface CancellableRunnable extends Runnable {
final class SequentialExecutorService {

// This class is not directly usable.
private SequentialExecutorService() {
}
private SequentialExecutorService() {}

/**
* This Executor takes a serial stream of string keys and
* {@code Runnable} tasks, and runs the tasks with the same key sequentially. Tasks with the same
* key will be run only when its predecessor has been completed while tasks with different keys
* can be run in parallel.
* This Executor takes a serial stream of string keys and {@code Runnable} tasks, and runs the
* tasks with the same key sequentially. Tasks with the same key will be run only when its
* predecessor has been completed while tasks with different keys can be run in parallel.
*/
private abstract static class SequentialExecutor {
// Maps keys to tasks.
Expand All @@ -78,43 +77,44 @@ protected void execute(final String key, Runnable task) {
tasksByKey.put(key, newTasks);
}

execute(key, newTasks);
callNextTaskAsync(key, newTasks);
}

protected abstract void execute(String key, Deque<Runnable> finalTasks);
protected void callNextTaskAsync(final String key, final Deque<Runnable> tasks) {
executor.execute(
new Runnable() {
@Override
public void run() {
// TODO(kimkyung-goog): Check if there is a race when task list becomes empty.
Runnable task = tasks.poll();
if (task != null) {
task.run();
postTaskExecution(key, tasks);
}
}
});
}

protected void invokeCallback(final Deque<Runnable> tasks) {
// TODO(kimkyung-goog): Check if there is a race when task list becomes empty.
Runnable task = tasks.poll();
if (task != null) {
task.run();
}
protected void postTaskExecution(String key, Deque<Runnable> tasks) {
// Do nothing in this class, but provide an opportunity for a subclass to do something
// interesting.
}
}

@BetaApi
static class AutoExecutor extends SequentialExecutor {
AutoExecutor(Executor executor) {
super(executor);
}

/** Runs synchronous {@code Runnable} tasks sequentially. */
void submit(String key, Runnable task) {
void submit(String key, Runnable task) {
super.execute(key, task);
}

@Override
protected void execute(final String key, final Deque<Runnable> tasks) {
executor.execute(
new Runnable() {
@Override
public void run() {
invokeCallbackAndExecuteNext(key, tasks);
}
});
}

private void invokeCallbackAndExecuteNext(final String key, final Deque<Runnable> tasks) {
invokeCallback(tasks);
/** Once a task is done, automatically run the next task in the queue. */
protected void postTaskExecution(final String key, final Deque<Runnable> tasks) {
synchronized (tasksByKey) {
if (tasks.isEmpty()) {
// Note that there can be a race if a task is added to `tasks` at this point. However,
Expand All @@ -125,16 +125,19 @@ private void invokeCallbackAndExecuteNext(final String key, final Deque<Runnable
return;
}
}
execute(key, tasks);

callNextTaskAsync(key, tasks);
}
}

/**
* Runs asynchronous {@code Callable} tasks sequentially for the same key. If one of the tasks fails, other tasks
* with the same key that have not been executed will be cancelled.
* Runs asynchronous {@code Callable} tasks sequentially for the same key. If one of the tasks
* fails, other tasks with the same key that have not been executed will be cancelled.
*/
@BetaApi
static class CallbackExecutor extends SequentialExecutor {
private static final Logger logger = Logger.getLogger(SequentialExecutorService.SequentialExecutor.class.getName());
private static final Logger logger =
Logger.getLogger(SequentialExecutorService.SequentialExecutor.class.getName());

CallbackExecutor(Executor executor) {
super(executor);
Expand All @@ -143,8 +146,8 @@ static class CallbackExecutor extends SequentialExecutor {
/**
* Runs asynchronous {@code Callable} tasks sequentially. If one of the tasks fails, other tasks
* with the same key that have not been executed will be cancelled.
* <p>
* This method does the following in a chain:
*
* <p>This method does the following in a chain:
*
* <ol>
* <li>Creates an `ApiFuture` that can be used for tracking progress.
Expand Down Expand Up @@ -227,17 +230,6 @@ public void cancel(Throwable e) {
return future;
}

@Override
protected void execute(final String key, final Deque<Runnable> tasks) {
executor.execute(
new Runnable() {
@Override
public void run() {
invokeCallback(tasks);
}
});
}

/** Executes the next queued task associated with {@code key}. */
private void resume(String key) {
Deque<Runnable> tasks;
Expand All @@ -251,7 +243,7 @@ private void resume(String key) {
return;
}
}
execute(key, tasks);
callNextTaskAsync(key, tasks);
}

/** Cancels every task in the queue assoicated with {@code key}. */
Expand Down