diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index 817be12a8e2b5..c29ca5c1d0853 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -178,25 +178,19 @@ public void onResponse(ReplicaResponse response) { @Override public void onFailure(Exception replicaException) { - logger.trace( - (org.apache.logging.log4j.util.Supplier) () -> new ParameterizedMessage( - "[{}] failure while performing [{}] on replica {}, request [{}]", - shard.shardId(), - opType, - shard, - replicaRequest), - replicaException); - if (TransportActions.isShardNotAvailableException(replicaException)) { - decPendingAndFinishIfNeeded(); - } else { + logger.trace((org.apache.logging.log4j.util.Supplier) () -> new ParameterizedMessage( + "[{}] failure while performing [{}] on replica {}, request [{}]", + shard.shardId(), opType, shard, replicaRequest), replicaException); + // Only report "critical" exceptions - TODO: Reach out to the master node to get the latest shard state then report. + if (TransportActions.isShardNotAvailableException(replicaException) == false) { RestStatus restStatus = ExceptionsHelper.status(replicaException); shardReplicaFailures.add(new ReplicationResponse.ShardInfo.Failure( shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false)); - String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard); - replicasProxy.failShardIfNeeded(shard, message, - replicaException, ReplicationOperation.this::decPendingAndFinishIfNeeded, - ReplicationOperation.this::onPrimaryDemoted, throwable -> decPendingAndFinishIfNeeded()); } + String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard); + replicasProxy.failShardIfNeeded(shard, message, + replicaException, ReplicationOperation.this::decPendingAndFinishIfNeeded, + ReplicationOperation.this::onPrimaryDemoted, throwable -> decPendingAndFinishIfNeeded()); } }); } diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java index 858cbcce19989..e85c03411f7e2 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java @@ -29,7 +29,6 @@ import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; @@ -54,7 +53,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Supplier; -import java.util.stream.Collectors; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithActivePrimary; @@ -95,28 +93,26 @@ public void testReplication() throws Exception { final Set expectedReplicas = getExpectedReplicas(shardId, initialState, trackedShards); - final Map expectedFailures = new HashMap<>(); - final Set expectedFailedShards = new HashSet<>(); + final Map simulatedFailures = new HashMap<>(); + final Map reportedFailures = new HashMap<>(); for (ShardRouting replica : expectedReplicas) { if (randomBoolean()) { Exception t; boolean criticalFailure = randomBoolean(); if (criticalFailure) { t = new CorruptIndexException("simulated", (String) null); + reportedFailures.put(replica, t); } else { t = new IndexShardNotStartedException(shardId, IndexShardState.RECOVERING); } logger.debug("--> simulating failure on {} with [{}]", replica, t.getClass().getSimpleName()); - expectedFailures.put(replica, t); - if (criticalFailure) { - expectedFailedShards.add(replica); - } + simulatedFailures.put(replica, t); } } Request request = new Request(shardId); PlainActionFuture listener = new PlainActionFuture<>(); - final TestReplicaProxy replicasProxy = new TestReplicaProxy(primaryTerm, expectedFailures); + final TestReplicaProxy replicasProxy = new TestReplicaProxy(primaryTerm, simulatedFailures); final TestPrimary primary = new TestPrimary(primaryShard, () -> replicationGroup); final TestReplicationOperation op = new TestReplicationOperation(request, @@ -125,13 +121,13 @@ public void testReplication() throws Exception { assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true)); assertThat(request.processedOnReplicas, equalTo(expectedReplicas)); - assertThat(replicasProxy.failedReplicas, equalTo(expectedFailedShards)); + assertThat(replicasProxy.failedReplicas, equalTo(simulatedFailures.keySet())); assertThat(replicasProxy.markedAsStaleCopies, equalTo(staleAllocationIds)); assertTrue("listener is not marked as done", listener.isDone()); ShardInfo shardInfo = listener.actionGet().getShardInfo(); - assertThat(shardInfo.getFailed(), equalTo(expectedFailedShards.size())); - assertThat(shardInfo.getFailures(), arrayWithSize(expectedFailedShards.size())); - assertThat(shardInfo.getSuccessful(), equalTo(1 + expectedReplicas.size() - expectedFailures.size())); + assertThat(shardInfo.getFailed(), equalTo(reportedFailures.size())); + assertThat(shardInfo.getFailures(), arrayWithSize(reportedFailures.size())); + assertThat(shardInfo.getSuccessful(), equalTo(1 + expectedReplicas.size() - simulatedFailures.size())); final List unassignedShards = indexShardRoutingTable.shardsWithState(ShardRoutingState.UNASSIGNED); final int totalShards = 1 + expectedReplicas.size() + unassignedShards.size() + untrackedShards.size();