Skip to content

Commit

Permalink
[WIP] Refactoring SequentialExecutorService (#4969)
Browse files Browse the repository at this point in the history
* Refactoring SequentialExecutorService

Step 1:
- create a `CallbackExecutor` and `AutoExecutor` as subclasses of SequentialExecutor.

* Adding CallbackExecutor.submit() for encapsulation

* Moving resume into `CallbackExecutor`

* create an abstract class for execute(key, deque)
This removest the last bit of code that directly used the TaskCompleteAction enum.
  • Loading branch information
sduskis authored Apr 17, 2019
1 parent 7732506 commit b68add0
Showing 1 changed file with 112 additions and 124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@
import java.util.logging.Level;
import java.util.logging.Logger;

import static com.google.common.util.concurrent.MoreExecutors.directExecutor;

interface CancellableRunnable extends Runnable {
public void cancel(Throwable e);
void cancel(Throwable e);
}

/**
Expand All @@ -42,108 +44,40 @@ interface CancellableRunnable extends Runnable {
final class SequentialExecutorService<T> {
private static final Logger logger = Logger.getLogger(SequentialExecutorService.class.getName());

private final SequentialExecutor manageableSequentialExecutor;
private final SequentialExecutor autoSequentialExecutor;
private final CallbackExecutor callbackExecutor;
private final AutoExecutor autoExecutor;

SequentialExecutorService(Executor executor) {
this.manageableSequentialExecutor =
SequentialExecutor.newManageableSequentialExecutor(executor);
this.autoSequentialExecutor = SequentialExecutor.newAutoSequentialExecutor(executor);
this.callbackExecutor = new CallbackExecutor(executor);
this.autoExecutor = new AutoExecutor(executor);
}

/**
* Runs asynchronous {@code Callable} tasks sequentially. If one of the tasks fails, other tasks
* with the same key that have not been executed will be cancelled.
*/
ApiFuture<T> submit(final String key, final Callable<ApiFuture> callable) {
final SettableApiFuture<T> future = SettableApiFuture.<T>create();
manageableSequentialExecutor.execute(
key,
new CancellableRunnable() {
private boolean cancelled = false;

@Override
public void run() {
if (cancelled) {
return;
}
try {
ApiFuture<T> callResult = callable.call();
ApiFutures.addCallback(
callResult,
new ApiFutureCallback<T>() {
@Override
public void onSuccess(T msg) {
future.set(msg);
manageableSequentialExecutor.resume(key);
}

@Override
public void onFailure(Throwable e) {
future.setException(e);
manageableSequentialExecutor.cancelQueuedTasks(
key,
new CancellationException(
"Execution cancelled because executing previous runnable failed."));
}
});
} catch (Exception e) {
future.setException(e);
}
}

@Override
public void cancel(Throwable e) {
this.cancelled = true;
future.setException(e);
}
});
return future;
return callbackExecutor.submit(key, callable);
}

/** Runs synchronous {@code Runnable} tasks sequentially. */
void submit(final String key, final Runnable runnable) {
autoSequentialExecutor.execute(key, runnable);
void submit( String key, Runnable runnable) {
autoExecutor.execute(key, runnable);
}

/**
* Internal implemenation of SequentialExecutorService. Takes a serial stream of string keys and
* Internal implementation of SequentialExecutorService. Takes a serial stream of string keys and
* {@code Runnable} tasks, and runs the tasks with the same key sequentially. Tasks with the same
* key will be run only when its predecessor has been completed while tasks with different keys
* can be run in parallel.
*/
static class SequentialExecutor {
static abstract class SequentialExecutor {
// Maps keys to tasks.
private final Map<String, Deque<Runnable>> tasksByKey;
private final Executor executor;

enum TaskCompleteAction {
EXECUTE_NEXT_TASK,
WAIT_UNTIL_RESUME,
}

private TaskCompleteAction taskCompleteAction;
protected final Map<String, Deque<Runnable>> tasksByKey;
protected final Executor executor;

/**
* Creates a AutoSequentialExecutor which executes the next queued task automatically when the
* previous task has completed.
*/
static SequentialExecutor newAutoSequentialExecutor(Executor executor) {
return new SequentialExecutor(executor, TaskCompleteAction.EXECUTE_NEXT_TASK);
}

/**
* Creates a ManageableSequentialExecutor which allows users to decide when to execute the next
* queued task. The first queued task is executed immediately, but the following tasks will be
* executed only when {@link #resume(String)} is called explicitly.
*/
static SequentialExecutor newManageableSequentialExecutor(Executor executor) {
return new SequentialExecutor(executor, TaskCompleteAction.WAIT_UNTIL_RESUME);
}

private SequentialExecutor(Executor executor, TaskCompleteAction taskCompleteAction) {
private SequentialExecutor(Executor executor) {
this.executor = executor;
this.taskCompleteAction = taskCompleteAction;
this.tasksByKey = new HashMap<>();
}

Expand All @@ -162,25 +96,11 @@ void execute(final String key, Runnable task) {
tasksByKey.put(key, newTasks);
}

final Deque<Runnable> finalTasks = newTasks;
executor.execute(
new Runnable() {
@Override
public void run() {
switch (taskCompleteAction) {
case EXECUTE_NEXT_TASK:
invokeCallbackAndExecuteNext(key, finalTasks);
break;
case WAIT_UNTIL_RESUME:
invokeCallback(finalTasks);
break;
default:
// Nothing to do.
}
}
});
execute(key, newTasks);
}

protected abstract void execute(String key, Deque<Runnable> finalTasks);

/** Cancels every task in the queue assoicated with {@code key}. */
void cancelQueuedTasks(final String key, Throwable e) {
// TODO(kimkyung-goog): Ensure execute() fails once cancelQueueTasks() has been ever invoked,
Expand All @@ -203,59 +123,127 @@ void cancelQueuedTasks(final String key, Throwable e) {
}
}

/** Executes the next queued task associated with {@code key}. */
void resume(final String key) {
if (taskCompleteAction.equals(TaskCompleteAction.EXECUTE_NEXT_TASK)) {
// resume() is no-op since tasks are executed automatically.
return;
protected void invokeCallback(final Deque<Runnable> tasks) {
// TODO(kimkyung-goog): Check if there is a race when task list becomes empty.
Runnable task = tasks.poll();
if (task != null) {
task.run();
}
Deque<Runnable> tasks;
}

protected void invokeCallbackAndExecuteNext(final String key, final Deque<Runnable> tasks) {
invokeCallback(tasks);
synchronized (tasksByKey) {
tasks = tasksByKey.get(key);
if (tasks == null) {
return;
}
if (tasks.isEmpty()) {
// Note that there can be a race if a task is added to `tasks` at this point. However,
// tasks.add() is called only inside the block synchronized by `tasksByKey` object
// in the execute() function. Therefore, we are safe to remove `tasks` here. This is not
// optimal, but correct.
tasksByKey.remove(key);
return;
}
}
final Deque<Runnable> finalTasks = tasks;
// Run the next task.
executor.execute(
new Runnable() {
@Override
public void run() {
invokeCallback(finalTasks);
invokeCallbackAndExecuteNext(key, tasks);
}
});
}
}

private void invokeCallback(final Deque<Runnable> tasks) {
// TODO(kimkyung-goog): Check if there is a race when task list becomes empty.
Runnable task = tasks.poll();
if (task != null) {
task.run();
}
private static class AutoExecutor extends SequentialExecutor {
AutoExecutor(Executor executor) {
super(executor);
}

private void invokeCallbackAndExecuteNext(final String key, final Deque<Runnable> tasks) {
invokeCallback(tasks);
protected void execute(final String key, final Deque<Runnable> finalTasks) {
executor.execute(new Runnable() {
@Override public void run() {
invokeCallbackAndExecuteNext(key, finalTasks);
}
});
}
}

private static class CallbackExecutor extends SequentialExecutor {
CallbackExecutor(Executor executor) {
super(executor);
}

<T> ApiFuture<T> submit(final String key, final Callable<ApiFuture> callable) {
final SettableApiFuture<T> future = SettableApiFuture.create();
execute(
key,
new CancellableRunnable() {
private boolean cancelled = false;

@Override
public void run() {
if (cancelled) {
return;
}
try {
ApiFuture<T> callResult = callable.call();
ApiFutures.addCallback(callResult, new ApiFutureCallback<T>() {
@Override
public void onSuccess(T msg) {
future.set(msg);
resume(key);
}

@Override
public void onFailure(Throwable e) {
future.setException(e);
cancelQueuedTasks(
key,
new CancellationException(
"Execution cancelled because executing previous runnable failed."));
}
}, directExecutor());
} catch (Exception e) {
future.setException(e);
}
}

@Override
public void cancel(Throwable e) {
this.cancelled = true;
future.setException(e);
}
});
return future;
}

protected void execute(final String key, final Deque<Runnable> finalTasks) {
executor.execute(new Runnable() {
@Override public void run() {
invokeCallback(finalTasks);
}
});
}

/** Executes the next queued task associated with {@code key}. */
void resume(final String key) {
Deque<Runnable> tasks;
synchronized (tasksByKey) {
tasks = tasksByKey.get(key);
if (tasks == null) {
return;
}
if (tasks.isEmpty()) {
// Note that there can be a race if a task is added to `tasks` at this point. However,
// tasks.add() is called only inside the block synchronized by `tasksByKey` object
// in the execute() function. Therefore, we are safe to remove `tasks` here. This is not
// optimal, but correct.
tasksByKey.remove(key);
return;
}
}
final Deque<Runnable> finalTasks = tasks;
// Run the next task.
executor.execute(
new Runnable() {
@Override
public void run() {
invokeCallbackAndExecuteNext(key, tasks);
invokeCallback(finalTasks);
}
});
}
Expand Down

0 comments on commit b68add0

Please sign in to comment.