diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/store/CorruptedFileIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/store/CorruptedFileIT.java index 31eb0bdd00f30..d9a21acbd4175 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/store/CorruptedFileIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/store/CorruptedFileIT.java @@ -15,15 +15,18 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse; import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.nodes.BaseNodeResponse; import org.elasticsearch.client.internal.Requests; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -33,7 +36,9 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.io.stream.BytesStreamOutput; @@ -41,6 +46,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.PathUtils; import org.elasticsearch.core.TimeValue; @@ -451,6 +457,40 @@ public void testCorruptionOnNetworkLayer() throws InterruptedException { connection.sendRequest(requestId, action, request, options); }); + final var allocationGivenUpFuture = new PlainActionFuture(); + final var maxRetries = MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY.get(Settings.EMPTY); + new ClusterStateObserver( + internalCluster().getCurrentMasterNodeInstance(ClusterService.class), + logger, + new ThreadContext(Settings.EMPTY) + ).waitForNextChange(new ClusterStateObserver.Listener() { + @Override + public void onNewClusterState(ClusterState state) { + allocationGivenUpFuture.onResponse(null); + } + + @Override + public void onClusterServiceClose() { + allocationGivenUpFuture.onFailure(new ElasticsearchException("closed")); + } + + @Override + public void onTimeout(TimeValue timeout) { + allocationGivenUpFuture.onFailure(new ElasticsearchException("timed out")); + } + }, state -> { + final var indexRoutingTable = state.routingTable().index("test"); + for (int shardId = 0; shardId < indexRoutingTable.size(); shardId++) { + final var replicaShards = indexRoutingTable.shard(shardId).replicaShards(); + if (replicaShards.isEmpty() + || replicaShards.stream() + .anyMatch(sr -> sr.unassigned() == false || sr.unassignedInfo().getNumFailedAllocations() < maxRetries)) { + return false; + } + } + return true; + }, TimeValue.timeValueSeconds(30)); + // can not allocate on unluckyNode client().admin() .indices() @@ -461,10 +501,10 @@ public void testCorruptionOnNetworkLayer() throws InterruptedException { .put("index.routing.allocation.include._name", primariesNode.getName() + "," + unluckyNode.getName()) ) .get(); - ensureYellowAndNoInitializingShards("test"); + allocationGivenUpFuture.actionGet(); assertThatAllShards("test", shard -> { assertThat(shard.primaryShard().currentNodeId(), equalTo(primariesNode.getId())); - assertThat(shard.replicaShards().get(0).state(), not(equalTo(ShardRoutingState.STARTED))); + assertThat(shard.replicaShards().get(0).state(), equalTo(ShardRoutingState.UNASSIGNED)); }); // can allocate on any other data node