diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/PutAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/PutAutoFollowPatternAction.java index a01fd8e3bc209..eb23244722d0a 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/PutAutoFollowPatternAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/PutAutoFollowPatternAction.java @@ -56,9 +56,9 @@ public static class Request extends AcknowledgedRequest implements ToXC PARSER.declareLong(Request::setMaxOperationSizeInBytes, AutoFollowPattern.MAX_BATCH_SIZE_IN_BYTES); PARSER.declareInt(Request::setMaxConcurrentWriteBatches, AutoFollowPattern.MAX_CONCURRENT_WRITE_BATCHES); PARSER.declareInt(Request::setMaxWriteBufferSize, AutoFollowPattern.MAX_WRITE_BUFFER_SIZE); - PARSER.declareField(Request::setRetryTimeout, + PARSER.declareField(Request::setMaxRetryDelay, (p, c) -> TimeValue.parseTimeValue(p.text(), AutoFollowPattern.RETRY_TIMEOUT.getPreferredName()), - ShardFollowTask.RETRY_TIMEOUT, ObjectParser.ValueType.STRING); + ShardFollowTask.MAX_RETRY_DELAY, ObjectParser.ValueType.STRING); PARSER.declareField(Request::setIdleShardRetryDelay, (p, c) -> TimeValue.parseTimeValue(p.text(), AutoFollowPattern.IDLE_SHARD_RETRY_DELAY.getPreferredName()), ShardFollowTask.IDLE_SHARD_RETRY_DELAY, ObjectParser.ValueType.STRING); @@ -87,7 +87,7 @@ public static Request fromXContent(XContentParser parser, String remoteClusterAl private Long maxOperationSizeInBytes; private Integer maxConcurrentWriteBatches; private Integer maxWriteBufferSize; - private TimeValue retryTimeout; + private TimeValue maxRetryDelay; private TimeValue idleShardRetryDelay; @Override @@ -166,12 +166,12 @@ public void setMaxWriteBufferSize(Integer maxWriteBufferSize) { this.maxWriteBufferSize = maxWriteBufferSize; } - public TimeValue getRetryTimeout() { - return retryTimeout; + public TimeValue getMaxRetryDelay() { + return maxRetryDelay; } - public void setRetryTimeout(TimeValue retryTimeout) { - this.retryTimeout = retryTimeout; + public void setMaxRetryDelay(TimeValue maxRetryDelay) { + this.maxRetryDelay = maxRetryDelay; } public TimeValue getIdleShardRetryDelay() { @@ -193,7 +193,7 @@ public void readFrom(StreamInput in) throws IOException { maxOperationSizeInBytes = in.readOptionalLong(); maxConcurrentWriteBatches = in.readOptionalVInt(); maxWriteBufferSize = in.readOptionalVInt(); - retryTimeout = in.readOptionalTimeValue(); + maxRetryDelay = in.readOptionalTimeValue(); idleShardRetryDelay = in.readOptionalTimeValue(); } @@ -208,7 +208,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalLong(maxOperationSizeInBytes); out.writeOptionalVInt(maxConcurrentWriteBatches); out.writeOptionalVInt(maxWriteBufferSize); - out.writeOptionalTimeValue(retryTimeout); + out.writeOptionalTimeValue(maxRetryDelay); out.writeOptionalTimeValue(idleShardRetryDelay); } @@ -236,8 +236,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (maxConcurrentWriteBatches != null) { builder.field(ShardFollowTask.MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches); } - if (retryTimeout != null) { - builder.field(ShardFollowTask.RETRY_TIMEOUT.getPreferredName(), retryTimeout.getStringRep()); + if (maxRetryDelay != null) { + builder.field(ShardFollowTask.MAX_RETRY_DELAY.getPreferredName(), maxRetryDelay.getStringRep()); } if (idleShardRetryDelay != null) { builder.field(ShardFollowTask.IDLE_SHARD_RETRY_DELAY.getPreferredName(), idleShardRetryDelay.getStringRep()); @@ -260,7 +260,7 @@ public boolean equals(Object o) { Objects.equals(maxOperationSizeInBytes, request.maxOperationSizeInBytes) && Objects.equals(maxConcurrentWriteBatches, request.maxConcurrentWriteBatches) && Objects.equals(maxWriteBufferSize, request.maxWriteBufferSize) && - Objects.equals(retryTimeout, request.retryTimeout) && + Objects.equals(maxRetryDelay, request.maxRetryDelay) && Objects.equals(idleShardRetryDelay, request.idleShardRetryDelay); } @@ -275,7 +275,7 @@ public int hashCode() { maxOperationSizeInBytes, maxConcurrentWriteBatches, maxWriteBufferSize, - retryTimeout, + maxRetryDelay, idleShardRetryDelay ); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 0a0a6877dc92a..c221c097977a3 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.support.TransportActions; +import org.elasticsearch.common.Randomness; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.transport.NetworkExceptionHelper; import org.elasticsearch.common.unit.TimeValue; @@ -18,7 +19,6 @@ import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse; -import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction; import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; import java.util.ArrayList; @@ -43,11 +43,12 @@ */ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { + private static final int DELAY_MILLIS = 50; private static final Logger LOGGER = Loggers.getLogger(ShardFollowNodeTask.class); private final String leaderIndex; private final ShardFollowTask params; - private final TimeValue retryTimeout; + private final TimeValue maxRetryDelay; private final TimeValue idleShardChangesRequestDelay; private final BiConsumer scheduler; private final LongSupplier relativeTimeProvider; @@ -79,7 +80,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { this.params = params; this.scheduler = scheduler; this.relativeTimeProvider = relativeTimeProvider; - this.retryTimeout = params.getRetryTimeout(); + this.maxRetryDelay = params.getMaxRetryDelay(); this.idleShardChangesRequestDelay = params.getIdleShardRetryDelay(); /* * We keep track of the most recent fetch exceptions, with the number of exceptions that we track equal to the maximum number of @@ -357,20 +358,28 @@ private void updateMapping(LongConsumer handler, AtomicInteger retryCounter) { private void handleFailure(Exception e, AtomicInteger retryCounter, Runnable task) { assert e != null; - if (shouldRetry(e)) { - if (isStopped() == false && retryCounter.incrementAndGet() <= FollowIndexAction.RETRY_LIMIT) { - LOGGER.debug(new ParameterizedMessage("{} error during follow shard task, retrying...", params.getFollowShardId()), e); - scheduler.accept(retryTimeout, task); - } else { - markAsFailed(new ElasticsearchException("retrying failed [" + retryCounter.get() + - "] times, aborting...", e)); - } + if (shouldRetry(e) && isStopped() == false) { + int currentRetry = retryCounter.incrementAndGet(); + LOGGER.debug(new ParameterizedMessage("{} error during follow shard task, retrying [{}]", + params.getFollowShardId(), currentRetry), e); + long delay = computeDelay(currentRetry, maxRetryDelay.getMillis()); + scheduler.accept(TimeValue.timeValueMillis(delay), task); } else { markAsFailed(e); } } - private boolean shouldRetry(Exception e) { + static long computeDelay(int currentRetry, long maxRetryDelayInMillis) { + // Cap currentRetry to avoid overflow when computing n variable + int maxCurrentRetry = Math.min(currentRetry, 24); + long n = Math.round(Math.pow(2, maxCurrentRetry - 1)); + // + 1 here, because nextInt(...) bound is exclusive and otherwise the first delay would always be zero. + int k = Randomness.get().nextInt(Math.toIntExact(n + 1)); + int backOffDelay = k * DELAY_MILLIS; + return Math.min(backOffDelay, maxRetryDelayInMillis); + } + + private static boolean shouldRetry(Exception e) { return NetworkExceptionHelper.isConnectException(e) || NetworkExceptionHelper.isCloseConnectionException(e) || TransportActions.isShardNotAvailableException(e); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java index 82482792f3907..9da19cb1998d7 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java @@ -48,7 +48,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { public static final ParseField MAX_BATCH_SIZE_IN_BYTES = new ParseField("max_batch_size_in_bytes"); public static final ParseField MAX_CONCURRENT_WRITE_BATCHES = new ParseField("max_concurrent_write_batches"); public static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size"); - public static final ParseField RETRY_TIMEOUT = new ParseField("retry_timeout"); + public static final ParseField MAX_RETRY_DELAY = new ParseField("max_retry_delay"); public static final ParseField IDLE_SHARD_RETRY_DELAY = new ParseField("idle_shard_retry_delay"); @SuppressWarnings("unchecked") @@ -71,8 +71,8 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_CONCURRENT_WRITE_BATCHES); PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_WRITE_BUFFER_SIZE); PARSER.declareField(ConstructingObjectParser.constructorArg(), - (p, c) -> TimeValue.parseTimeValue(p.text(), RETRY_TIMEOUT.getPreferredName()), - RETRY_TIMEOUT, ObjectParser.ValueType.STRING); + (p, c) -> TimeValue.parseTimeValue(p.text(), MAX_RETRY_DELAY.getPreferredName()), + MAX_RETRY_DELAY, ObjectParser.ValueType.STRING); PARSER.declareField(ConstructingObjectParser.constructorArg(), (p, c) -> TimeValue.parseTimeValue(p.text(), IDLE_SHARD_RETRY_DELAY.getPreferredName()), IDLE_SHARD_RETRY_DELAY, ObjectParser.ValueType.STRING); @@ -87,13 +87,13 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { private final long maxBatchSizeInBytes; private final int maxConcurrentWriteBatches; private final int maxWriteBufferSize; - private final TimeValue retryTimeout; + private final TimeValue maxRetryDelay; private final TimeValue idleShardRetryDelay; private final Map headers; ShardFollowTask(String leaderClusterAlias, ShardId followShardId, ShardId leaderShardId, int maxBatchOperationCount, int maxConcurrentReadBatches, long maxBatchSizeInBytes, int maxConcurrentWriteBatches, - int maxWriteBufferSize, TimeValue retryTimeout, TimeValue idleShardRetryDelay, Map headers) { + int maxWriteBufferSize, TimeValue maxRetryDelay, TimeValue idleShardRetryDelay, Map headers) { this.leaderClusterAlias = leaderClusterAlias; this.followShardId = followShardId; this.leaderShardId = leaderShardId; @@ -102,7 +102,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { this.maxBatchSizeInBytes = maxBatchSizeInBytes; this.maxConcurrentWriteBatches = maxConcurrentWriteBatches; this.maxWriteBufferSize = maxWriteBufferSize; - this.retryTimeout = retryTimeout; + this.maxRetryDelay = maxRetryDelay; this.idleShardRetryDelay = idleShardRetryDelay; this.headers = headers != null ? Collections.unmodifiableMap(headers) : Collections.emptyMap(); } @@ -116,7 +116,7 @@ public ShardFollowTask(StreamInput in) throws IOException { this.maxBatchSizeInBytes = in.readVLong(); this.maxConcurrentWriteBatches = in.readVInt(); this.maxWriteBufferSize = in.readVInt(); - this.retryTimeout = in.readTimeValue(); + this.maxRetryDelay = in.readTimeValue(); this.idleShardRetryDelay = in.readTimeValue(); this.headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString)); } @@ -153,8 +153,8 @@ public long getMaxBatchSizeInBytes() { return maxBatchSizeInBytes; } - public TimeValue getRetryTimeout() { - return retryTimeout; + public TimeValue getMaxRetryDelay() { + return maxRetryDelay; } public TimeValue getIdleShardRetryDelay() { @@ -184,7 +184,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVLong(maxBatchSizeInBytes); out.writeVInt(maxConcurrentWriteBatches); out.writeVInt(maxWriteBufferSize); - out.writeTimeValue(retryTimeout); + out.writeTimeValue(maxRetryDelay); out.writeTimeValue(idleShardRetryDelay); out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString); } @@ -210,7 +210,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(MAX_BATCH_SIZE_IN_BYTES.getPreferredName(), maxBatchSizeInBytes); builder.field(MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches); builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize); - builder.field(RETRY_TIMEOUT.getPreferredName(), retryTimeout.getStringRep()); + builder.field(MAX_RETRY_DELAY.getPreferredName(), maxRetryDelay.getStringRep()); builder.field(IDLE_SHARD_RETRY_DELAY.getPreferredName(), idleShardRetryDelay.getStringRep()); builder.field(HEADERS.getPreferredName(), headers); return builder.endObject(); @@ -229,7 +229,7 @@ public boolean equals(Object o) { maxConcurrentWriteBatches == that.maxConcurrentWriteBatches && maxBatchSizeInBytes == that.maxBatchSizeInBytes && maxWriteBufferSize == that.maxWriteBufferSize && - Objects.equals(retryTimeout, that.retryTimeout) && + Objects.equals(maxRetryDelay, that.maxRetryDelay) && Objects.equals(idleShardRetryDelay, that.idleShardRetryDelay) && Objects.equals(headers, that.headers); } @@ -237,7 +237,7 @@ public boolean equals(Object o) { @Override public int hashCode() { return Objects.hash(leaderClusterAlias, followShardId, leaderShardId, maxBatchOperationCount, maxConcurrentReadBatches, - maxConcurrentWriteBatches, maxBatchSizeInBytes, maxWriteBufferSize, retryTimeout, idleShardRetryDelay, headers); + maxConcurrentWriteBatches, maxBatchSizeInBytes, maxWriteBufferSize, maxRetryDelay, idleShardRetryDelay, headers); } public String toString() { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowIndexAction.java index 33447ef420800..3128a63f24bc2 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowIndexAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowIndexAction.java @@ -175,7 +175,7 @@ void start( request.getMaxOperationSizeInBytes(), request.getMaxConcurrentWriteBatches(), request.getMaxWriteBufferSize(), - request.getRetryTimeout(), + request.getMaxRetryDelay(), request.getIdleShardRetryDelay(), filteredHeaders); persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask, diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java index a4ff9511cfbd8..0824dea67f697 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java @@ -149,7 +149,7 @@ static ClusterState innerPut(PutAutoFollowPatternAction.Request request, request.getMaxOperationSizeInBytes(), request.getMaxConcurrentWriteBatches(), request.getMaxWriteBufferSize(), - request.getRetryTimeout(), + request.getMaxRetryDelay(), request.getIdleShardRetryDelay() ); patterns.put(request.getLeaderClusterAlias(), autoFollowPattern); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowTests.java index a4808e428feca..f4bd8a69e3f5d 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowTests.java @@ -131,7 +131,7 @@ public void testAutoFollowParameterAreDelegated() throws Exception { request.setMaxOperationSizeInBytes(randomNonNegativeLong()); } if (randomBoolean()) { - request.setRetryTimeout(TimeValue.timeValueMillis(500)); + request.setMaxRetryDelay(TimeValue.timeValueMillis(500)); } if (randomBoolean()) { request.setIdleShardRetryDelay(TimeValue.timeValueMillis(500)); @@ -162,8 +162,8 @@ public void testAutoFollowParameterAreDelegated() throws Exception { if (request.getMaxOperationSizeInBytes() != null) { assertThat(shardFollowTask.getMaxBatchSizeInBytes(), equalTo(request.getMaxOperationSizeInBytes())); } - if (request.getRetryTimeout() != null) { - assertThat(shardFollowTask.getRetryTimeout(), equalTo(request.getRetryTimeout())); + if (request.getMaxRetryDelay() != null) { + assertThat(shardFollowTask.getMaxRetryDelay(), equalTo(request.getMaxRetryDelay())); } if (request.getIdleShardRetryDelay() != null) { assertThat(shardFollowTask.getIdleShardRetryDelay(), equalTo(request.getIdleShardRetryDelay())); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/PutAutoFollowPatternRequestTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/PutAutoFollowPatternRequestTests.java index 27760578db945..d6dad3b019ca3 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/PutAutoFollowPatternRequestTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/PutAutoFollowPatternRequestTests.java @@ -41,7 +41,7 @@ protected PutAutoFollowPatternAction.Request createTestInstance() { request.setIdleShardRetryDelay(TimeValue.timeValueMillis(500)); } if (randomBoolean()) { - request.setRetryTimeout(TimeValue.timeValueMillis(500)); + request.setMaxRetryDelay(TimeValue.timeValueMillis(500)); } if (randomBoolean()) { request.setMaxBatchOperationCount(randomIntBetween(0, Integer.MAX_VALUE)); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java index e177f77e61377..e25d95538b2f5 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java @@ -30,12 +30,13 @@ import java.util.function.Consumer; import java.util.function.LongConsumer; +import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.sameInstance; public class ShardFollowNodeTaskTests extends ESTestCase { @@ -177,7 +178,7 @@ public void testReceiveRetryableError() { ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); startTask(task, 63, -1); - int max = randomIntBetween(1, 10); + int max = randomIntBetween(1, 30); for (int i = 0; i < max; i++) { readFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0))); } @@ -223,59 +224,6 @@ public void testReceiveRetryableError() { assertThat(status.leaderGlobalCheckpoint(), equalTo(63L)); } - public void testReceiveRetryableErrorRetriedTooManyTimes() { - ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); - startTask(task, 63, -1); - - int max = randomIntBetween(11, 32); - for (int i = 0; i < max; i++) { - readFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0))); - } - final AtomicLong retryCounter = new AtomicLong(); - // before each retry, we assert the fetch failures; after the last retry, the fetch failure should persist - beforeSendShardChangesRequest = status -> { - assertThat(status.numberOfFailedFetches(), equalTo(retryCounter.get())); - if (retryCounter.get() > 0) { - assertThat(status.fetchExceptions().entrySet(), hasSize(1)); - final Map.Entry entry = status.fetchExceptions().entrySet().iterator().next(); - assertThat(entry.getKey(), equalTo(0L)); - assertThat(entry.getValue(), instanceOf(ElasticsearchException.class)); - assertNotNull(entry.getValue().getCause()); - assertThat(entry.getValue().getCause(), instanceOf(ShardNotFoundException.class)); - final ShardNotFoundException cause = (ShardNotFoundException) entry.getValue().getCause(); - assertThat(cause.getShardId().getIndexName(), equalTo("leader_index")); - assertThat(cause.getShardId().getId(), equalTo(0)); - } - retryCounter.incrementAndGet(); - }; - task.coordinateReads(); - - assertThat(shardChangesRequests.size(), equalTo(11)); - for (long[] shardChangesRequest : shardChangesRequests) { - assertThat(shardChangesRequest[0], equalTo(0L)); - assertThat(shardChangesRequest[1], equalTo(64L)); - } - - assertTrue("task is stopped", task.isStopped()); - assertThat(fatalError, notNullValue()); - assertThat(fatalError.getMessage(), containsString("retrying failed [")); - ShardFollowNodeTaskStatus status = task.getStatus(); - assertThat(status.numberOfConcurrentReads(), equalTo(1)); - assertThat(status.numberOfConcurrentWrites(), equalTo(0)); - assertThat(status.numberOfFailedFetches(), equalTo(11L)); - assertThat(status.fetchExceptions().entrySet(), hasSize(1)); - final Map.Entry entry = status.fetchExceptions().entrySet().iterator().next(); - assertThat(entry.getKey(), equalTo(0L)); - assertThat(entry.getValue(), instanceOf(ElasticsearchException.class)); - assertNotNull(entry.getValue().getCause()); - assertThat(entry.getValue().getCause(), instanceOf(ShardNotFoundException.class)); - final ShardNotFoundException cause = (ShardNotFoundException) entry.getValue().getCause(); - assertThat(cause.getShardId().getIndexName(), equalTo("leader_index")); - assertThat(cause.getShardId().getId(), equalTo(0)); - assertThat(status.lastRequestedSeqNo(), equalTo(63L)); - assertThat(status.leaderGlobalCheckpoint(), equalTo(63L)); - } - public void testReceiveNonRetryableError() { ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); startTask(task, 63, -1); @@ -455,7 +403,7 @@ public void testMappingUpdateRetryableError() { ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); startTask(task, 63, -1); - int max = randomIntBetween(1, 10); + int max = randomIntBetween(1, 30); for (int i = 0; i < max; i++) { mappingUpdateFailures.add(new ConnectException()); } @@ -476,31 +424,6 @@ public void testMappingUpdateRetryableError() { } - public void testMappingUpdateRetryableErrorRetriedTooManyTimes() { - ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); - startTask(task, 63, -1); - - int max = randomIntBetween(11, 20); - for (int i = 0; i < max; i++) { - mappingUpdateFailures.add(new ConnectException()); - } - mappingVersions.add(1L); - task.coordinateReads(); - ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 1L, 64L); - task.handleReadResponse(0L, 64L, response); - - assertThat(mappingUpdateFailures.size(), equalTo(max - 11)); - assertThat(mappingVersions.size(), equalTo(1)); - assertThat(bulkShardOperationRequests.size(), equalTo(0)); - assertThat(task.isStopped(), equalTo(true)); - ShardFollowNodeTaskStatus status = task.getStatus(); - assertThat(status.mappingVersion(), equalTo(0L)); - assertThat(status.numberOfConcurrentReads(), equalTo(1)); - assertThat(status.numberOfConcurrentWrites(), equalTo(0)); - assertThat(status.lastRequestedSeqNo(), equalTo(63L)); - assertThat(status.leaderGlobalCheckpoint(), equalTo(63L)); - } - public void testMappingUpdateNonRetryableError() { ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); startTask(task, 63, -1); @@ -597,7 +520,7 @@ public void testRetryableError() { assertThat(shardChangesRequests.get(0)[0], equalTo(0L)); assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); - int max = randomIntBetween(1, 10); + int max = randomIntBetween(1, 30); for (int i = 0; i < max; i++) { writeFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0))); } @@ -616,34 +539,6 @@ public void testRetryableError() { assertThat(status.followerGlobalCheckpoint(), equalTo(-1L)); } - public void testRetryableErrorRetriedTooManyTimes() { - ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); - startTask(task, 63, -1); - - task.coordinateReads(); - assertThat(shardChangesRequests.size(), equalTo(1)); - assertThat(shardChangesRequests.get(0)[0], equalTo(0L)); - assertThat(shardChangesRequests.get(0)[1], equalTo(64L)); - - int max = randomIntBetween(11, 32); - for (int i = 0; i < max; i++) { - writeFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0))); - } - ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 0L, 643); - // Also invokes coordinatesWrites() - task.innerHandleReadResponse(0L, 63L, response); - - // Number of requests is equal to initial request + retried attempts: - assertThat(bulkShardOperationRequests.size(), equalTo(11)); - for (List operations : bulkShardOperationRequests) { - assertThat(operations, equalTo(Arrays.asList(response.getOperations()))); - } - assertThat(task.isStopped(), equalTo(true)); - ShardFollowNodeTaskStatus status = task.getStatus(); - assertThat(status.numberOfConcurrentWrites(), equalTo(1)); - assertThat(status.followerGlobalCheckpoint(), equalTo(-1L)); - } - public void testNonRetryableError() { ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); startTask(task, 63, -1); @@ -712,8 +607,25 @@ public void testHandleWriteResponse() { assertThat(status.followerGlobalCheckpoint(), equalTo(63L)); } - ShardFollowNodeTask createShardFollowTask(int maxBatchOperationCount, int maxConcurrentReadBatches, int maxConcurrentWriteBatches, - int bufferWriteLimit, long maxBatchSizeInBytes) { + public void testComputeDelay() { + long maxDelayInMillis = 1000; + assertThat(ShardFollowNodeTask.computeDelay(0, maxDelayInMillis), allOf(greaterThanOrEqualTo(0L), lessThanOrEqualTo(50L))); + assertThat(ShardFollowNodeTask.computeDelay(1, maxDelayInMillis), allOf(greaterThanOrEqualTo(0L), lessThanOrEqualTo(50L))); + assertThat(ShardFollowNodeTask.computeDelay(2, maxDelayInMillis), allOf(greaterThanOrEqualTo(0L), lessThanOrEqualTo(100L))); + assertThat(ShardFollowNodeTask.computeDelay(3, maxDelayInMillis), allOf(greaterThanOrEqualTo(0L), lessThanOrEqualTo(200L))); + assertThat(ShardFollowNodeTask.computeDelay(4, maxDelayInMillis), allOf(greaterThanOrEqualTo(0L), lessThanOrEqualTo(400L))); + assertThat(ShardFollowNodeTask.computeDelay(5, maxDelayInMillis), allOf(greaterThanOrEqualTo(0L), lessThanOrEqualTo(800L))); + assertThat(ShardFollowNodeTask.computeDelay(6, maxDelayInMillis), allOf(greaterThanOrEqualTo(0L), lessThanOrEqualTo(1000L))); + assertThat(ShardFollowNodeTask.computeDelay(7, maxDelayInMillis), allOf(greaterThanOrEqualTo(0L), lessThanOrEqualTo(1000L))); + assertThat(ShardFollowNodeTask.computeDelay(8, maxDelayInMillis), allOf(greaterThanOrEqualTo(0L), lessThanOrEqualTo(1000L))); + assertThat(ShardFollowNodeTask.computeDelay(1024, maxDelayInMillis), allOf(greaterThanOrEqualTo(0L), lessThanOrEqualTo(1000L))); + } + + private ShardFollowNodeTask createShardFollowTask(int maxBatchOperationCount, + int maxConcurrentReadBatches, + int maxConcurrentWriteBatches, + int bufferWriteLimit, + long maxBatchSizeInBytes) { AtomicBoolean stopped = new AtomicBoolean(false); ShardFollowTask params = new ShardFollowTask(null, new ShardId("follow_index", "", 0), new ShardId("leader_index", "", 0), maxBatchOperationCount, maxConcurrentReadBatches, maxBatchSizeInBytes, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowIndexAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowIndexAction.java index c42ef8db9c1fe..2c311356d4943 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowIndexAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowIndexAction.java @@ -33,7 +33,6 @@ public final class FollowIndexAction extends Action { public static final int DEFAULT_MAX_CONCURRENT_READ_BATCHES = 1; public static final int DEFAULT_MAX_CONCURRENT_WRITE_BATCHES = 1; public static final long DEFAULT_MAX_BATCH_SIZE_IN_BYTES = Long.MAX_VALUE; - public static final int RETRY_LIMIT = 10; public static final TimeValue DEFAULT_RETRY_TIMEOUT = new TimeValue(500); public static final TimeValue DEFAULT_IDLE_SHARD_RETRY_DELAY = TimeValue.timeValueSeconds(10); @@ -55,7 +54,7 @@ public static class Request extends ActionRequest implements ToXContentObject { private static final ParseField MAX_BATCH_SIZE_IN_BYTES = new ParseField("max_batch_size_in_bytes"); private static final ParseField MAX_CONCURRENT_WRITE_BATCHES = new ParseField("max_concurrent_write_batches"); private static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size"); - private static final ParseField RETRY_TIMEOUT = new ParseField("retry_timeout"); + private static final ParseField MAX_RETRY_DELAY = new ParseField("max_retry_delay"); private static final ParseField IDLE_SHARD_RETRY_DELAY = new ParseField("idle_shard_retry_delay"); private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, true, (args, followerIndex) -> { @@ -76,8 +75,8 @@ public static class Request extends ActionRequest implements ToXContentObject { PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), MAX_WRITE_BUFFER_SIZE); PARSER.declareField( ConstructingObjectParser.optionalConstructorArg(), - (p, c) -> TimeValue.parseTimeValue(p.text(), RETRY_TIMEOUT.getPreferredName()), - RETRY_TIMEOUT, + (p, c) -> TimeValue.parseTimeValue(p.text(), MAX_RETRY_DELAY.getPreferredName()), + MAX_RETRY_DELAY, ObjectParser.ValueType.STRING); PARSER.declareField( ConstructingObjectParser.optionalConstructorArg(), @@ -143,10 +142,10 @@ public int getMaxWriteBufferSize() { return maxWriteBufferSize; } - private TimeValue retryTimeout; + private TimeValue maxRetryDelay; - public TimeValue getRetryTimeout() { - return retryTimeout; + public TimeValue getMaxRetryDelay() { + return maxRetryDelay; } private TimeValue idleShardRetryDelay; @@ -163,7 +162,7 @@ public Request( final Long maxOperationSizeInBytes, final Integer maxConcurrentWriteBatches, final Integer maxWriteBufferSize, - final TimeValue retryTimeout, + final TimeValue maxRetryDelay, final TimeValue idleShardRetryDelay) { if (leaderIndex == null) { @@ -203,7 +202,7 @@ public Request( throw new IllegalArgumentException(MAX_WRITE_BUFFER_SIZE.getPreferredName() + " must be larger than 0"); } - final TimeValue actualRetryTimeout = retryTimeout == null ? DEFAULT_RETRY_TIMEOUT : retryTimeout; + final TimeValue actualRetryTimeout = maxRetryDelay == null ? DEFAULT_RETRY_TIMEOUT : maxRetryDelay; final TimeValue actualIdleShardRetryDelay = idleShardRetryDelay == null ? DEFAULT_IDLE_SHARD_RETRY_DELAY : idleShardRetryDelay; this.leaderIndex = leaderIndex; @@ -213,7 +212,7 @@ public Request( this.maxOperationSizeInBytes = actualMaxOperationSizeInBytes; this.maxConcurrentWriteBatches = actualMaxConcurrentWriteBatches; this.maxWriteBufferSize = actualMaxWriteBufferSize; - this.retryTimeout = actualRetryTimeout; + this.maxRetryDelay = actualRetryTimeout; this.idleShardRetryDelay = actualIdleShardRetryDelay; } @@ -236,7 +235,7 @@ public void readFrom(final StreamInput in) throws IOException { maxOperationSizeInBytes = in.readVLong(); maxConcurrentWriteBatches = in.readVInt(); maxWriteBufferSize = in.readVInt(); - retryTimeout = in.readOptionalTimeValue(); + maxRetryDelay = in.readOptionalTimeValue(); idleShardRetryDelay = in.readOptionalTimeValue(); } @@ -250,7 +249,7 @@ public void writeTo(final StreamOutput out) throws IOException { out.writeVLong(maxOperationSizeInBytes); out.writeVInt(maxConcurrentWriteBatches); out.writeVInt(maxWriteBufferSize); - out.writeOptionalTimeValue(retryTimeout); + out.writeOptionalTimeValue(maxRetryDelay); out.writeOptionalTimeValue(idleShardRetryDelay); } @@ -265,7 +264,7 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize); builder.field(MAX_CONCURRENT_READ_BATCHES.getPreferredName(), maxConcurrentReadBatches); builder.field(MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches); - builder.field(RETRY_TIMEOUT.getPreferredName(), retryTimeout.getStringRep()); + builder.field(MAX_RETRY_DELAY.getPreferredName(), maxRetryDelay.getStringRep()); builder.field(IDLE_SHARD_RETRY_DELAY.getPreferredName(), idleShardRetryDelay.getStringRep()); } builder.endObject(); @@ -282,7 +281,7 @@ public boolean equals(final Object o) { maxOperationSizeInBytes == request.maxOperationSizeInBytes && maxConcurrentWriteBatches == request.maxConcurrentWriteBatches && maxWriteBufferSize == request.maxWriteBufferSize && - Objects.equals(retryTimeout, request.retryTimeout) && + Objects.equals(maxRetryDelay, request.maxRetryDelay) && Objects.equals(idleShardRetryDelay, request.idleShardRetryDelay) && Objects.equals(leaderIndex, request.leaderIndex) && Objects.equals(followerIndex, request.followerIndex); @@ -298,7 +297,7 @@ public int hashCode() { maxOperationSizeInBytes, maxConcurrentWriteBatches, maxWriteBufferSize, - retryTimeout, + maxRetryDelay, idleShardRetryDelay ); }