Skip to content

Commit

Permalink
Fully stop RetryableAction when cancelled (#55614)
Browse files Browse the repository at this point in the history
Currently cancelling the RetryableAction does not stop one last run from
being executed. This commit makes a best effort attempt to cancel a
scheduled retry and guards future executions from the action already
being completed.
  • Loading branch information
Tim-Brooks authored Apr 24, 2020
1 parent cde5fc1 commit f2cf9e9
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.ArrayDeque;
Expand All @@ -48,6 +49,8 @@ public abstract class RetryableAction<Response> {
private final ActionListener<Response> finalListener;
private final String executor;

private volatile Scheduler.ScheduledCancellable retryTask;

public RetryableAction(Logger logger, ThreadPool threadPool, TimeValue initialDelay, TimeValue timeoutValue,
ActionListener<Response> listener) {
this(logger, threadPool, initialDelay, timeoutValue, listener, ThreadPool.Names.SAME);
Expand Down Expand Up @@ -75,6 +78,10 @@ public void run() {

public void cancel(Exception e) {
if (isDone.compareAndSet(false, true)) {
Scheduler.ScheduledCancellable localRetryTask = this.retryTask;
if (localRetryTask != null) {
localRetryTask.cancel();
}
finalListener.onFailure(e);
}
}
Expand All @@ -84,11 +91,16 @@ private Runnable createRunnable(RetryingListener retryingListener) {

@Override
protected void doRun() {
tryAction(listener);
retryTask = null;
// It is possible that the task was cancelled in between the retry being dispatched and now
if (isDone.get() == false) {
tryAction(listener);
}
}

@Override
public void onRejection(Exception e) {
retryTask = null;
// TODO: The only implementations of this class use SAME which means the execution will not be
// rejected. Future implementations can adjust this functionality as needed.
onFailure(e);
Expand Down Expand Up @@ -140,7 +152,7 @@ public void onFailure(Exception e) {
if (isDone.get() == false) {
final TimeValue delay = TimeValue.timeValueMillis(delayMillis);
logger.debug(() -> new ParameterizedMessage("retrying action that failed in {}", delay), e);
threadPool.schedule(runnable, delay, executor);
retryTask = threadPool.schedule(runnable, delay, executor);
}
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ public boolean shouldRetry(Exception e) {
retryableAction.cancel(new ElasticsearchException("Cancelled"));
taskQueue.runAllRunnableTasks();

assertEquals(2, executedCount.get());
// A second run will not occur because it is cancelled
assertEquals(1, executedCount.get());
expectThrows(ElasticsearchException.class, future::actionGet);
}
}

0 comments on commit f2cf9e9

Please sign in to comment.