Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not ignore shard not-available exceptions in replication #28571

Merged
merged 2 commits into from
Feb 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -178,25 +178,19 @@ public void onResponse(ReplicaResponse response) {

@Override
public void onFailure(Exception replicaException) {
logger.trace(
(org.apache.logging.log4j.util.Supplier<?>) () -> new ParameterizedMessage(
"[{}] failure while performing [{}] on replica {}, request [{}]",
shard.shardId(),
opType,
shard,
replicaRequest),
replicaException);
if (TransportActions.isShardNotAvailableException(replicaException)) {
decPendingAndFinishIfNeeded();
} else {
logger.trace((org.apache.logging.log4j.util.Supplier<?>) () -> new ParameterizedMessage(
"[{}] failure while performing [{}] on replica {}, request [{}]",
shard.shardId(), opType, shard, replicaRequest), replicaException);
// Only report "critical" exceptions - TODO: Reach out to the master node to get the latest shard state then report.
if (TransportActions.isShardNotAvailableException(replicaException) == false) {
RestStatus restStatus = ExceptionsHelper.status(replicaException);
shardReplicaFailures.add(new ReplicationResponse.ShardInfo.Failure(
shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false));
String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard);
replicasProxy.failShardIfNeeded(shard, message,
replicaException, ReplicationOperation.this::decPendingAndFinishIfNeeded,
ReplicationOperation.this::onPrimaryDemoted, throwable -> decPendingAndFinishIfNeeded());
}
String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard);
replicasProxy.failShardIfNeeded(shard, message,
replicaException, ReplicationOperation.this::decPendingAndFinishIfNeeded,
ReplicationOperation.this::onPrimaryDemoted, throwable -> decPendingAndFinishIfNeeded());
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
Expand All @@ -54,7 +53,6 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state;
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithActivePrimary;
Expand Down Expand Up @@ -95,28 +93,26 @@ public void testReplication() throws Exception {

final Set<ShardRouting> expectedReplicas = getExpectedReplicas(shardId, initialState, trackedShards);

final Map<ShardRouting, Exception> expectedFailures = new HashMap<>();
final Set<ShardRouting> expectedFailedShards = new HashSet<>();
final Map<ShardRouting, Exception> simulatedFailures = new HashMap<>();
final Map<ShardRouting, Exception> reportedFailures = new HashMap<>();
for (ShardRouting replica : expectedReplicas) {
if (randomBoolean()) {
Exception t;
boolean criticalFailure = randomBoolean();
if (criticalFailure) {
t = new CorruptIndexException("simulated", (String) null);
reportedFailures.put(replica, t);
} else {
t = new IndexShardNotStartedException(shardId, IndexShardState.RECOVERING);
}
logger.debug("--> simulating failure on {} with [{}]", replica, t.getClass().getSimpleName());
expectedFailures.put(replica, t);
if (criticalFailure) {
expectedFailedShards.add(replica);
}
simulatedFailures.put(replica, t);
}
}

Request request = new Request(shardId);
PlainActionFuture<TestPrimary.Result> listener = new PlainActionFuture<>();
final TestReplicaProxy replicasProxy = new TestReplicaProxy(primaryTerm, expectedFailures);
final TestReplicaProxy replicasProxy = new TestReplicaProxy(primaryTerm, simulatedFailures);

final TestPrimary primary = new TestPrimary(primaryShard, () -> replicationGroup);
final TestReplicationOperation op = new TestReplicationOperation(request,
Expand All @@ -125,13 +121,13 @@ public void testReplication() throws Exception {

assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true));
assertThat(request.processedOnReplicas, equalTo(expectedReplicas));
assertThat(replicasProxy.failedReplicas, equalTo(expectedFailedShards));
assertThat(replicasProxy.failedReplicas, equalTo(simulatedFailures.keySet()));
assertThat(replicasProxy.markedAsStaleCopies, equalTo(staleAllocationIds));
assertTrue("listener is not marked as done", listener.isDone());
ShardInfo shardInfo = listener.actionGet().getShardInfo();
assertThat(shardInfo.getFailed(), equalTo(expectedFailedShards.size()));
assertThat(shardInfo.getFailures(), arrayWithSize(expectedFailedShards.size()));
assertThat(shardInfo.getSuccessful(), equalTo(1 + expectedReplicas.size() - expectedFailures.size()));
assertThat(shardInfo.getFailed(), equalTo(reportedFailures.size()));
assertThat(shardInfo.getFailures(), arrayWithSize(reportedFailures.size()));
assertThat(shardInfo.getSuccessful(), equalTo(1 + expectedReplicas.size() - simulatedFailures.size()));
final List<ShardRouting> unassignedShards =
indexShardRoutingTable.shardsWithState(ShardRoutingState.UNASSIGNED);
final int totalShards = 1 + expectedReplicas.size() + unassignedShards.size() + untrackedShards.size();
Expand Down