From 9e376589a69e55460e3fcaacbefc2395d66dc775 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Fri, 24 Apr 2020 13:22:10 -0600 Subject: [PATCH] Fully stop RetryableAction when cancelled (#55614) Currently cancelling the RetryableAction does not stop one last run from being executed. This commit makes a best effort attempt to cancel a scheduled retry and guards future executions from the action already being completed. --- .../action/support/RetryableAction.java | 16 ++++++++++++++-- .../action/support/RetryableActionTests.java | 3 ++- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/RetryableAction.java b/server/src/main/java/org/elasticsearch/action/support/RetryableAction.java index d1101a60702ad..aa804ba76d193 100644 --- a/server/src/main/java/org/elasticsearch/action/support/RetryableAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/RetryableAction.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayDeque; @@ -48,6 +49,8 @@ public abstract class RetryableAction { private final ActionListener finalListener; private final String executor; + private volatile Scheduler.ScheduledCancellable retryTask; + public RetryableAction(Logger logger, ThreadPool threadPool, TimeValue initialDelay, TimeValue timeoutValue, ActionListener listener) { this(logger, threadPool, initialDelay, timeoutValue, listener, ThreadPool.Names.SAME); @@ -75,6 +78,10 @@ public void run() { public void cancel(Exception e) { if (isDone.compareAndSet(false, true)) { + Scheduler.ScheduledCancellable localRetryTask = this.retryTask; + if (localRetryTask != null) { + localRetryTask.cancel(); + } finalListener.onFailure(e); } } @@ -84,11 +91,16 @@ private Runnable createRunnable(RetryingListener retryingListener) { @Override protected void doRun() { - tryAction(listener); + retryTask = null; + // It is possible that the task was cancelled in between the retry being dispatched and now + if (isDone.get() == false) { + tryAction(listener); + } } @Override public void onRejection(Exception e) { + retryTask = null; // TODO: The only implementations of this class use SAME which means the execution will not be // rejected. Future implementations can adjust this functionality as needed. onFailure(e); @@ -140,7 +152,7 @@ public void onFailure(Exception e) { if (isDone.get() == false) { final TimeValue delay = TimeValue.timeValueMillis(delayMillis); logger.debug(() -> new ParameterizedMessage("retrying action that failed in {}", delay), e); - threadPool.schedule(runnable, delay, executor); + retryTask = threadPool.schedule(runnable, delay, executor); } } } else { diff --git a/server/src/test/java/org/elasticsearch/action/support/RetryableActionTests.java b/server/src/test/java/org/elasticsearch/action/support/RetryableActionTests.java index 34b57e9431094..ff64efa83cc1a 100644 --- a/server/src/test/java/org/elasticsearch/action/support/RetryableActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/RetryableActionTests.java @@ -201,7 +201,8 @@ public boolean shouldRetry(Exception e) { retryableAction.cancel(new ElasticsearchException("Cancelled")); taskQueue.runAllRunnableTasks(); - assertEquals(2, executedCount.get()); + // A second run will not occur because it is cancelled + assertEquals(1, executedCount.get()); expectThrows(ElasticsearchException.class, future::actionGet); } }