From 2748ffb80059bfb5db5aa759aaf13a1d3d752d6e Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 7 Feb 2018 14:36:14 -0500 Subject: [PATCH] Do not ignore shard not-available exceptions in replication The shard not-available exceptions are currently ignored in the replication as the best effort avoids failing not-yet-ready shards. However these exceptions can also happen from fully active shards. If this is the case, we may have skipped important failures from replicas. Since #28049, only fully initialized shards are received write requests. This restriction allows us to handle all exceptions in the replication. There is a side-effect with this change. If a replica retries its peer recovery second time after being tracked in the replication group, it can receive replication requests even though it's not-yet-ready. That shard may be failed and allocated to another node even though it has a good lucene index on that node. This PR does not change the way we report replication errors to users, hence the shard not-available exceptions won't be reported as before. --- .../replication/ReplicationOperation.java | 24 +++++++------------ .../ReplicationOperationTests.java | 22 +++++++---------- 2 files changed, 18 insertions(+), 28 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index 817be12a8e2b5..c29ca5c1d0853 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -178,25 +178,19 @@ public void onResponse(ReplicaResponse response) { @Override public void onFailure(Exception replicaException) { - logger.trace( - (org.apache.logging.log4j.util.Supplier) () -> new ParameterizedMessage( - "[{}] failure while performing [{}] on replica {}, request [{}]", - shard.shardId(), - opType, - shard, - replicaRequest), - replicaException); - if (TransportActions.isShardNotAvailableException(replicaException)) { - decPendingAndFinishIfNeeded(); - } else { + logger.trace((org.apache.logging.log4j.util.Supplier) () -> new ParameterizedMessage( + "[{}] failure while performing [{}] on replica {}, request [{}]", + shard.shardId(), opType, shard, replicaRequest), replicaException); + // Only report "critical" exceptions - TODO: Reach out to the master node to get the latest shard state then report. + if (TransportActions.isShardNotAvailableException(replicaException) == false) { RestStatus restStatus = ExceptionsHelper.status(replicaException); shardReplicaFailures.add(new ReplicationResponse.ShardInfo.Failure( shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false)); - String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard); - replicasProxy.failShardIfNeeded(shard, message, - replicaException, ReplicationOperation.this::decPendingAndFinishIfNeeded, - ReplicationOperation.this::onPrimaryDemoted, throwable -> decPendingAndFinishIfNeeded()); } + String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard); + replicasProxy.failShardIfNeeded(shard, message, + replicaException, ReplicationOperation.this::decPendingAndFinishIfNeeded, + ReplicationOperation.this::onPrimaryDemoted, throwable -> decPendingAndFinishIfNeeded()); } }); } diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java index 858cbcce19989..e85c03411f7e2 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java @@ -29,7 +29,6 @@ import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; @@ -54,7 +53,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Supplier; -import java.util.stream.Collectors; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithActivePrimary; @@ -95,28 +93,26 @@ public void testReplication() throws Exception { final Set expectedReplicas = getExpectedReplicas(shardId, initialState, trackedShards); - final Map expectedFailures = new HashMap<>(); - final Set expectedFailedShards = new HashSet<>(); + final Map simulatedFailures = new HashMap<>(); + final Map reportedFailures = new HashMap<>(); for (ShardRouting replica : expectedReplicas) { if (randomBoolean()) { Exception t; boolean criticalFailure = randomBoolean(); if (criticalFailure) { t = new CorruptIndexException("simulated", (String) null); + reportedFailures.put(replica, t); } else { t = new IndexShardNotStartedException(shardId, IndexShardState.RECOVERING); } logger.debug("--> simulating failure on {} with [{}]", replica, t.getClass().getSimpleName()); - expectedFailures.put(replica, t); - if (criticalFailure) { - expectedFailedShards.add(replica); - } + simulatedFailures.put(replica, t); } } Request request = new Request(shardId); PlainActionFuture listener = new PlainActionFuture<>(); - final TestReplicaProxy replicasProxy = new TestReplicaProxy(primaryTerm, expectedFailures); + final TestReplicaProxy replicasProxy = new TestReplicaProxy(primaryTerm, simulatedFailures); final TestPrimary primary = new TestPrimary(primaryShard, () -> replicationGroup); final TestReplicationOperation op = new TestReplicationOperation(request, @@ -125,13 +121,13 @@ public void testReplication() throws Exception { assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true)); assertThat(request.processedOnReplicas, equalTo(expectedReplicas)); - assertThat(replicasProxy.failedReplicas, equalTo(expectedFailedShards)); + assertThat(replicasProxy.failedReplicas, equalTo(simulatedFailures.keySet())); assertThat(replicasProxy.markedAsStaleCopies, equalTo(staleAllocationIds)); assertTrue("listener is not marked as done", listener.isDone()); ShardInfo shardInfo = listener.actionGet().getShardInfo(); - assertThat(shardInfo.getFailed(), equalTo(expectedFailedShards.size())); - assertThat(shardInfo.getFailures(), arrayWithSize(expectedFailedShards.size())); - assertThat(shardInfo.getSuccessful(), equalTo(1 + expectedReplicas.size() - expectedFailures.size())); + assertThat(shardInfo.getFailed(), equalTo(reportedFailures.size())); + assertThat(shardInfo.getFailures(), arrayWithSize(reportedFailures.size())); + assertThat(shardInfo.getSuccessful(), equalTo(1 + expectedReplicas.size() - simulatedFailures.size())); final List unassignedShards = indexShardRoutingTable.shardsWithState(ShardRoutingState.UNASSIGNED); final int totalShards = 1 + expectedReplicas.size() + unassignedShards.size() + untrackedShards.size();