Skip to content

Commit

Permalink
Merged with the formatting changes. (#4978)
Browse files Browse the repository at this point in the history
  • Loading branch information
sduskis authored Apr 18, 2019
1 parent 7f96c1a commit 6be9e3d
Showing 1 changed file with 38 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,35 +101,29 @@ void execute(final String key, Runnable task) {

protected abstract void execute(String key, Deque<Runnable> finalTasks);

/** Cancels every task in the queue assoicated with {@code key}. */
void cancelQueuedTasks(final String key, Throwable e) {
// TODO(kimkyung-goog): Ensure execute() fails once cancelQueueTasks() has been ever invoked,
// so that no more tasks are scheduled.
synchronized (tasksByKey) {
final Deque<Runnable> tasks = tasksByKey.get(key);
if (tasks == null) {
return;
}
while (!tasks.isEmpty()) {
Runnable task = tasks.poll();
if (task instanceof CancellableRunnable) {
((CancellableRunnable) task).cancel(e);
} else {
logger.log(
Level.WARNING,
"Attempted to cancel Runnable that was not CancellableRunnable; ignored.");
}
}
}
}

protected void invokeCallback(final Deque<Runnable> tasks) {
// TODO(kimkyung-goog): Check if there is a race when task list becomes empty.
Runnable task = tasks.poll();
if (task != null) {
task.run();
}
}
}

private static class AutoExecutor extends SequentialExecutor {
AutoExecutor(Executor executor) {
super(executor);
}

protected void execute(final String key, final Deque<Runnable> finalTasks) {
executor.execute(
new Runnable() {
@Override
public void run() {
invokeCallbackAndExecuteNext(key, finalTasks);
}
});
}

protected void invokeCallbackAndExecuteNext(final String key, final Deque<Runnable> tasks) {
invokeCallback(tasks);
Expand All @@ -153,22 +147,6 @@ public void run() {
}
}

private static class AutoExecutor extends SequentialExecutor {
AutoExecutor(Executor executor) {
super(executor);
}

protected void execute(final String key, final Deque<Runnable> finalTasks) {
executor.execute(
new Runnable() {
@Override
public void run() {
invokeCallbackAndExecuteNext(key, finalTasks);
}
});
}
}

private static class CallbackExecutor extends SequentialExecutor {
CallbackExecutor(Executor executor) {
super(executor);
Expand Down Expand Up @@ -255,4 +233,26 @@ public void run() {
});
}
}

/** Cancels every task in the queue assoicated with {@code key}. */
void cancelQueuedTasks(final String key, Throwable e) {
// TODO(kimkyung-goog): Ensure execute() fails once cancelQueueTasks() has been ever invoked,
// so that no more tasks are scheduled.
synchronized (tasksByKey) {
final Deque<Runnable> tasks = tasksByKey.get(key);
if (tasks == null) {
return;
}
while (!tasks.isEmpty()) {
Runnable task = tasks.poll();
if (task instanceof CancellableRunnable) {
((CancellableRunnable) task).cancel(e);
} else {
logger.log(
Level.WARNING,
"Attempted to cancel Runnable that was not CancellableRunnable; ignored.");
}
}
}
}
}

0 comments on commit 6be9e3d

Please sign in to comment.