diff --git a/server/src/main/java/org/elasticsearch/action/support/RetryableAction.java b/server/src/main/java/org/elasticsearch/action/support/RetryableAction.java index c984447c26760..5ff9c2d05fef0 100644 --- a/server/src/main/java/org/elasticsearch/action/support/RetryableAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/RetryableAction.java @@ -103,12 +103,12 @@ public void onRejection(Exception e) { public abstract boolean shouldRetry(Exception e); - protected long calculateDelay(long previousDelay) { - return Math.min(previousDelay * 2, Integer.MAX_VALUE); + protected long calculateDelayBound(long previousDelayBound) { + return Math.min(previousDelayBound * 2, Integer.MAX_VALUE); } protected long minimumDelayMillis() { - return 1L; + return 0L; } public void onFinished() { @@ -145,10 +145,12 @@ public void onFailure(Exception e) { } else { addException(e); - final long nextDelayMillisBound = calculateDelay(delayMillisBound); + final long nextDelayMillisBound = calculateDelayBound(delayMillisBound); final RetryingListener retryingListener = new RetryingListener(nextDelayMillisBound, caughtExceptions); final Runnable runnable = createRunnable(retryingListener); - final long delayMillis = Randomness.get().nextInt(Math.toIntExact(delayMillisBound)) + minimumDelayMillis(); + int range = Math.toIntExact((delayMillisBound + 1) / 2); + final long delayMillis = Randomness.get().nextInt(range) + delayMillisBound - range + 1L; + assert delayMillis > 0; if (isDone.get() == false) { final TimeValue delay = TimeValue.timeValueMillis(delayMillis); logger.debug(() -> new ParameterizedMessage("retrying action that failed in {}", delay), e); diff --git a/server/src/test/java/org/elasticsearch/action/support/RetryableActionTests.java b/server/src/test/java/org/elasticsearch/action/support/RetryableActionTests.java index b5e12b43eef1e..88a68ba7aa4f4 100644 --- a/server/src/test/java/org/elasticsearch/action/support/RetryableActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/RetryableActionTests.java @@ -105,7 +105,7 @@ public void testRetryableActionTimeout() { final AtomicInteger retryCount = new AtomicInteger(); final PlainActionFuture future = PlainActionFuture.newFuture(); final RetryableAction retryableAction = new RetryableAction(logger, taskQueue.getThreadPool(), - TimeValue.timeValueMillis(10), TimeValue.timeValueSeconds(1), future) { + TimeValue.timeValueMillis(randomFrom(1, 10, randomIntBetween(100, 2000))), TimeValue.timeValueSeconds(1), future) { @Override public void tryAction(ActionListener listener) { @@ -122,6 +122,7 @@ public boolean shouldRetry(Exception e) { return e instanceof EsRejectedExecutionException; } }; + long begin = taskQueue.getCurrentTimeMillis(); retryableAction.run(); taskQueue.runAllRunnableTasks(); long previousDeferredTime = 0; @@ -136,6 +137,10 @@ public boolean shouldRetry(Exception e) { assertFalse(taskQueue.hasRunnableTasks()); expectThrows(EsRejectedExecutionException.class, future::actionGet); + + long end = taskQueue.getCurrentTimeMillis(); + // max 3x timeout since we minimum wait half the bound for every retry. + assertThat(end - begin, lessThanOrEqualTo(3000L)); } public void testTimeoutOfZeroMeansNoRetry() { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterService.java index 368f5d81fb266..f5750fcb00931 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/persistence/ResultsPersisterService.java @@ -390,7 +390,6 @@ private abstract class MlRetryableAction extends RetryableAct final Consumer msgHandler; final BiConsumer> action; volatile int currentAttempt = 0; - volatile long currentMin = MIN_RETRY_SLEEP_MILLIS; volatile long currentMax = MIN_RETRY_SLEEP_MILLIS; MlRetryableAction(String jobId, @@ -453,17 +452,10 @@ public boolean shouldRetry(Exception e) { } @Override - protected long calculateDelay(long previousDelay) { - // Since we exponentially increase, we don't want force randomness to have an excessively long sleep - if (currentMax < MAX_RETRY_SLEEP_MILLIS) { - currentMin = currentMax; - } + protected long calculateDelayBound(long previousDelayBound) { // Exponential backoff calculation taken from: https://en.wikipedia.org/wiki/Exponential_backoff int uncappedBackoff = ((1 << Math.min(currentAttempt, MAX_RETRY_EXPONENT)) - 1) * (50); currentMax = Math.min(uncappedBackoff, MAX_RETRY_SLEEP_MILLIS); - // Its good to have a random window along the exponentially increasing curve - // so that not all bulk requests rest for the same amount of time - int randBound = (int)(1 + (currentMax - currentMin)); String msg = new ParameterizedMessage( "failed to {} after [{}] attempts. Will attempt again.", getName(), @@ -471,12 +463,10 @@ protected long calculateDelay(long previousDelay) { .getFormattedMessage(); LOGGER.warn(() -> new ParameterizedMessage("[{}] {}", jobId, msg)); msgHandler.accept(msg); - return randBound; - } - - @Override - protected long minimumDelayMillis() { - return currentMin; + // RetryableAction randomizes in the interval [currentMax/2 ; currentMax]. + // Its good to have a random window along the exponentially increasing curve + // so that not all bulk requests rest for the same amount of time + return currentMax; } @Override