diff --git a/server/src/main/java/org/elasticsearch/action/support/RetryableAction.java b/server/src/main/java/org/elasticsearch/action/support/RetryableAction.java index a8916837b996e..ba57364aa0e3b 100644 --- a/server/src/main/java/org/elasticsearch/action/support/RetryableAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/RetryableAction.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayDeque; @@ -48,6 +49,8 @@ public abstract class RetryableAction { private final ActionListener finalListener; private final String executor; + private volatile Scheduler.ScheduledCancellable retryTask; + public RetryableAction(Logger logger, ThreadPool threadPool, TimeValue initialDelay, TimeValue timeoutValue, ActionListener listener) { this(logger, threadPool, initialDelay, timeoutValue, listener, ThreadPool.Names.SAME); @@ -75,6 +78,10 @@ public void run() { public void cancel(Exception e) { if (isDone.compareAndSet(false, true)) { + Scheduler.ScheduledCancellable localRetryTask = this.retryTask; + if (localRetryTask != null) { + localRetryTask.cancel(); + } finalListener.onFailure(e); } } @@ -84,11 +91,16 @@ private Runnable createRunnable(RetryingListener retryingListener) { @Override protected void doRun() { - tryAction(listener); + retryTask = null; + // It is possible that the task was cancelled in between the retry being dispatched and now + if (isDone.get() == false) { + tryAction(listener); + } } @Override public void onRejection(Exception e) { + retryTask = null; // TODO: The only implementations of this class use SAME which means the execution will not be // rejected. Future implementations can adjust this functionality as needed. onFailure(e); @@ -140,7 +152,7 @@ public void onFailure(Exception e) { if (isDone.get() == false) { final TimeValue delay = TimeValue.timeValueMillis(delayMillis); logger.debug(() -> new ParameterizedMessage("retrying action that failed in {}", delay), e); - threadPool.schedule(runnable, delay, executor); + retryTask = threadPool.schedule(runnable, delay, executor); } } } else { diff --git a/server/src/test/java/org/elasticsearch/action/support/RetryableActionTests.java b/server/src/test/java/org/elasticsearch/action/support/RetryableActionTests.java index 96b523aee56db..22ac8acf2402b 100644 --- a/server/src/test/java/org/elasticsearch/action/support/RetryableActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/RetryableActionTests.java @@ -201,7 +201,8 @@ public boolean shouldRetry(Exception e) { retryableAction.cancel(new ElasticsearchException("Cancelled")); taskQueue.runAllRunnableTasks(); - assertEquals(2, executedCount.get()); + // A second run will not occur because it is cancelled + assertEquals(1, executedCount.get()); expectThrows(ElasticsearchException.class, future::actionGet); } }