Skip to content

Commit

Permalink
Fix testCorruptionOnNetworkLayer (#88644)
Browse files Browse the repository at this point in the history
`POST _cluster/reroute?retry_failed` doesn't reset the failure counter
on any `INITIALIZING` shards, and waiting for no `INITIALIZING` shards
isn't quite enough to ensure that we've finished all possible retries
because there could instead be an ongoing async fetch.

This commit fixes this using a `ClusterStateObserver` to observe the
retry counter instead of using the cluster health action.

Relates #88314
Closes #88615
  • Loading branch information
DaveCTurner authored Jul 20, 2022
1 parent 0eca582 commit 6bbe32f
Showing 1 changed file with 42 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,18 @@
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.client.internal.Requests;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand All @@ -33,14 +36,17 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.PathUtils;
import org.elasticsearch.core.TimeValue;
Expand Down Expand Up @@ -451,6 +457,40 @@ public void testCorruptionOnNetworkLayer() throws InterruptedException {
connection.sendRequest(requestId, action, request, options);
});

final var allocationGivenUpFuture = new PlainActionFuture<Void>();
final var maxRetries = MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY.get(Settings.EMPTY);
new ClusterStateObserver(
internalCluster().getCurrentMasterNodeInstance(ClusterService.class),
logger,
new ThreadContext(Settings.EMPTY)
).waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
allocationGivenUpFuture.onResponse(null);
}

@Override
public void onClusterServiceClose() {
allocationGivenUpFuture.onFailure(new ElasticsearchException("closed"));
}

@Override
public void onTimeout(TimeValue timeout) {
allocationGivenUpFuture.onFailure(new ElasticsearchException("timed out"));
}
}, state -> {
final var indexRoutingTable = state.routingTable().index("test");
for (int shardId = 0; shardId < indexRoutingTable.size(); shardId++) {
final var replicaShards = indexRoutingTable.shard(shardId).replicaShards();
if (replicaShards.isEmpty()
|| replicaShards.stream()
.anyMatch(sr -> sr.unassigned() == false || sr.unassignedInfo().getNumFailedAllocations() < maxRetries)) {
return false;
}
}
return true;
}, TimeValue.timeValueSeconds(30));

// can not allocate on unluckyNode
client().admin()
.indices()
Expand All @@ -461,10 +501,10 @@ public void testCorruptionOnNetworkLayer() throws InterruptedException {
.put("index.routing.allocation.include._name", primariesNode.getName() + "," + unluckyNode.getName())
)
.get();
ensureYellowAndNoInitializingShards("test");
allocationGivenUpFuture.actionGet();
assertThatAllShards("test", shard -> {
assertThat(shard.primaryShard().currentNodeId(), equalTo(primariesNode.getId()));
assertThat(shard.replicaShards().get(0).state(), not(equalTo(ShardRoutingState.STARTED)));
assertThat(shard.replicaShards().get(0).state(), equalTo(ShardRoutingState.UNASSIGNED));
});

// can allocate on any other data node
Expand Down

0 comments on commit 6bbe32f

Please sign in to comment.