diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/store/CorruptedFileIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/store/CorruptedFileIT.java index b7d1d1ebab06c..31eb0bdd00f30 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/store/CorruptedFileIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/store/CorruptedFileIT.java @@ -16,19 +16,18 @@ import org.apache.lucene.store.FSDirectory; import org.apache.lucene.util.BytesRef; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; -import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; -import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse; import org.elasticsearch.action.index.IndexRequestBuilder; -import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.nodes.BaseNodeResponse; import org.elasticsearch.client.internal.Requests; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.GroupShardsIterator; -import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardRouting; @@ -86,7 +85,10 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import static java.util.stream.Collectors.collectingAndThen; +import static java.util.stream.Collectors.toCollection; import static org.elasticsearch.common.util.CollectionUtils.iterableAsArrayList; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAllSuccessful; @@ -127,7 +129,7 @@ protected Collection> nodePlugins() { /** * Tests that we can actually recover from a corruption on the primary given that we have replica shards around. */ - public void testCorruptFileAndRecover() throws ExecutionException, InterruptedException, IOException { + public void testCorruptFileAndRecover() throws InterruptedException, IOException { int numDocs = scaledRandomIntBetween(100, 1000); // have enough space for 3 copies internalCluster().ensureAtLeastNumDataNodes(3); @@ -161,8 +163,7 @@ public void testCorruptFileAndRecover() throws ExecutionException, InterruptedEx assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).get()); assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).get()); // we have to flush at least once here since we don't corrupt the translog - SearchResponse countResponse = client().prepareSearch().setSize(0).get(); - assertHitCount(countResponse, numDocs); + assertHitCount(client().prepareSearch().setSize(0).get(), numDocs); final int numShards = numShards("test"); ShardRouting corruptedShardRouting = corruptRandomPrimaryFile(); @@ -193,8 +194,7 @@ public void testCorruptFileAndRecover() throws ExecutionException, InterruptedEx assertThat(health.getStatus(), equalTo(ClusterHealthStatus.GREEN)); final int numIterations = scaledRandomIntBetween(5, 20); for (int i = 0; i < numIterations; i++) { - SearchResponse response = client().prepareSearch().setSize(numDocs).get(); - assertHitCount(response, numDocs); + assertHitCount(client().prepareSearch().setSize(numDocs).get(), numDocs); } /* @@ -279,8 +279,7 @@ public void testCorruptPrimaryNoReplica() throws ExecutionException, Interrupted assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).get()); assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).get()); // we have to flush at least once here since we don't corrupt the translog - SearchResponse countResponse = client().prepareSearch().setSize(0).get(); - assertHitCount(countResponse, numDocs); + assertHitCount(client().prepareSearch().setSize(0).get(), numDocs); ShardRouting shardRouting = corruptRandomPrimaryFile(); /* @@ -336,41 +335,33 @@ public void testCorruptPrimaryNoReplica() throws ExecutionException, Interrupted * This simulates recoveries from old indices or even without checksums and makes sure if we fail during finalization * we also check if the primary is ok. Without the relevant checks this test fails with a RED cluster */ - public void testCorruptionOnNetworkLayerFinalizingRecovery() throws ExecutionException, InterruptedException, IOException { + public void testCorruptionOnNetworkLayerFinalizingRecovery() throws InterruptedException { internalCluster().ensureAtLeastNumDataNodes(2); - NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().get(); - List dataNodeStats = new ArrayList<>(); - for (NodeStats stat : nodeStats.getNodes()) { - if (stat.getNode().canContainData()) { - dataNodeStats.add(stat); - } - } - assertThat(dataNodeStats.size(), greaterThanOrEqualTo(2)); - Collections.shuffle(dataNodeStats, random()); - NodeStats primariesNode = dataNodeStats.get(0); - NodeStats unluckyNode = dataNodeStats.get(1); + var dataNodes = getShuffledDataNodes(); + + var primariesNode = dataNodes.get(0); + var unluckyNode = dataNodes.get(1); assertAcked( prepareCreate("test").setSettings( Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0") .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put("index.routing.allocation.include._name", primariesNode.getNode().getName()) + .put("index.routing.allocation.include._name", primariesNode.getName()) .put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE) .put("index.allocation.max_retries", Integer.MAX_VALUE) // keep on retrying - ) ); ensureGreen(); // allocated with empty commit final AtomicBoolean corrupt = new AtomicBoolean(true); final CountDownLatch hasCorrupted = new CountDownLatch(1); - for (NodeStats dataNode : dataNodeStats) { + for (var dataNode : dataNodes) { MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance( TransportService.class, - dataNode.getNode().getName() + dataNode.getName() )); mockTransportService.addSendBehavior( - internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()), + internalCluster().getInstance(TransportService.class, unluckyNode.getName()), (connection, requestId, action, request, options) -> { if (corrupt.get() && action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) { RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request; @@ -386,7 +377,7 @@ public void testCorruptionOnNetworkLayerFinalizingRecovery() throws ExecutionExc Settings build = Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "1") - .put("index.routing.allocation.include._name", primariesNode.getNode().getName() + "," + unluckyNode.getNode().getName()) + .put("index.routing.allocation.include._name", primariesNode.getName() + "," + unluckyNode.getName()) .build(); client().admin().indices().prepareUpdateSettings("test").setSettings(build).get(); client().admin().cluster().prepareReroute().get(); @@ -399,24 +390,13 @@ public void testCorruptionOnNetworkLayerFinalizingRecovery() throws ExecutionExc * Tests corruption that happens on the network layer and that the primary does not get affected by corruption that happens on the way * to the replica. The file on disk stays uncorrupted */ - public void testCorruptionOnNetworkLayer() throws ExecutionException, InterruptedException { + public void testCorruptionOnNetworkLayer() throws InterruptedException { int numDocs = scaledRandomIntBetween(100, 1000); - internalCluster().ensureAtLeastNumDataNodes(2); - if (cluster().numDataNodes() < 3) { - internalCluster().startDataOnlyNode(); - } - NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().get(); - List dataNodeStats = new ArrayList<>(); - for (NodeStats stat : nodeStats.getNodes()) { - if (stat.getNode().canContainData()) { - dataNodeStats.add(stat); - } - } + internalCluster().ensureAtLeastNumDataNodes(3); - assertThat(dataNodeStats.size(), greaterThanOrEqualTo(2)); - Collections.shuffle(dataNodeStats, random()); - NodeStats primariesNode = dataNodeStats.get(0); - NodeStats unluckyNode = dataNodeStats.get(1); + var dataNodes = getShuffledDataNodes(); + var primariesNode = dataNodes.get(0); + var unluckyNode = dataNodes.get(1); assertAcked( prepareCreate("test").setSettings( @@ -425,7 +405,7 @@ public void testCorruptionOnNetworkLayer() throws ExecutionException, Interrupte .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 4)) // don't go crazy here it must recovery fast // This does corrupt files on the replica, so we can't check: .put(MockFSIndexStore.INDEX_CHECK_INDEX_ON_CLOSE_SETTING.getKey(), false) - .put("index.routing.allocation.include._name", primariesNode.getNode().getName()) + .put("index.routing.allocation.include._name", primariesNode.getName()) .put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE) ) ); @@ -438,83 +418,86 @@ public void testCorruptionOnNetworkLayer() throws ExecutionException, Interrupte ensureGreen(); assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).execute().actionGet()); // we have to flush at least once here since we don't corrupt the translog - SearchResponse countResponse = client().prepareSearch().setSize(0).get(); - assertHitCount(countResponse, numDocs); - final boolean truncate = randomBoolean(); - for (NodeStats dataNode : dataNodeStats) { - MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance( - TransportService.class, - dataNode.getNode().getName() - )); - mockTransportService.addSendBehavior( - internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()), - (connection, requestId, action, request, options) -> { - if (action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) { - RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request; - if (truncate && req.length() > 1) { - BytesRef bytesRef = req.content().toBytesRef(); - BytesArray array = new BytesArray(bytesRef.bytes, bytesRef.offset, (int) req.length() - 1); - request = new RecoveryFileChunkRequest( - req.recoveryId(), - req.requestSeqNo(), - req.shardId(), - req.metadata(), - req.position(), - ReleasableBytesReference.wrap(array), - req.lastChunk(), - req.totalTranslogOps(), - req.sourceThrottleTimeInNanos() - ); - } else { - assert req.content().toBytesRef().bytes == req.content().toBytesRef().bytes : "no internal reference!!"; - final byte[] array = req.content().toBytesRef().bytes; - int i = randomIntBetween(0, req.content().length() - 1); - array[i] = (byte) ~array[i]; // flip one byte in the content - } - } - connection.sendRequest(requestId, action, request, options); - } - ); - } + assertHitCount(client().prepareSearch().setSize(0).get(), numDocs); - Settings build = Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "1") - .put("index.routing.allocation.include._name", "*") - .build(); - client().admin().indices().prepareUpdateSettings("test").setSettings(build).get(); - client().admin().cluster().prepareReroute().get(); - ClusterHealthResponse actionGet = client().admin() - .cluster() - .health(Requests.clusterHealthRequest("test").waitForGreenStatus()) - .actionGet(); - if (actionGet.isTimedOut()) { - logger.info( - "ensureGreen timed out, cluster state:\n{}\n{}", - client().admin().cluster().prepareState().get().getState(), - client().admin().cluster().preparePendingClusterTasks().get() - ); - assertThat("timed out waiting for green state", actionGet.isTimedOut(), equalTo(false)); - } - // we are green so primaries got not corrupted. - // ensure that no shard is actually allocated on the unlucky node - ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().get(); - final IndexRoutingTable indexRoutingTable = clusterStateResponse.getState().getRoutingTable().index("test"); - for (int shardId = 0; shardId < indexRoutingTable.size(); shardId++) { - final IndexShardRoutingTable indexShardRoutingTable = indexRoutingTable.shard(shardId); - for (int copy = 0; copy < indexShardRoutingTable.size(); copy++) { - final ShardRouting routing = indexShardRoutingTable.shard(copy); - if (unluckyNode.getNode().getId().equals(routing.currentNodeId())) { - assertThat(routing.state(), not(equalTo(ShardRoutingState.STARTED))); - assertThat(routing.state(), not(equalTo(ShardRoutingState.RELOCATING))); + var source = (MockTransportService) internalCluster().getInstance(TransportService.class, primariesNode.getName()); + var target = internalCluster().getInstance(TransportService.class, unluckyNode.getName()); + + final boolean truncate = randomBoolean(); + source.addSendBehavior(target, (connection, requestId, action, request, options) -> { + if (action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) { + RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request; + if (truncate && req.length() > 1) { + BytesRef bytesRef = req.content().toBytesRef(); + BytesArray array = new BytesArray(bytesRef.bytes, bytesRef.offset, (int) req.length() - 1); + request = new RecoveryFileChunkRequest( + req.recoveryId(), + req.requestSeqNo(), + req.shardId(), + req.metadata(), + req.position(), + ReleasableBytesReference.wrap(array), + req.lastChunk(), + req.totalTranslogOps(), + req.sourceThrottleTimeInNanos() + ); + } else { + assert req.content().toBytesRef().bytes == req.content().toBytesRef().bytes : "no internal reference!!"; + final byte[] array = req.content().toBytesRef().bytes; + int i = randomIntBetween(0, req.content().length() - 1); + array[i] = (byte) ~array[i]; // flip one byte in the content } } - } + connection.sendRequest(requestId, action, request, options); + }); + + // can not allocate on unluckyNode + client().admin() + .indices() + .prepareUpdateSettings("test") + .setSettings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "1") + .put("index.routing.allocation.include._name", primariesNode.getName() + "," + unluckyNode.getName()) + ) + .get(); + ensureYellowAndNoInitializingShards("test"); + assertThatAllShards("test", shard -> { + assertThat(shard.primaryShard().currentNodeId(), equalTo(primariesNode.getId())); + assertThat(shard.replicaShards().get(0).state(), not(equalTo(ShardRoutingState.STARTED))); + }); + + // can allocate on any other data node + client().admin() + .indices() + .prepareUpdateSettings("test") + .setSettings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "1") + .putNull("index.routing.allocation.include._name") + .put("index.routing.allocation.exclude._name", unluckyNode.getName()) + ) + .get(); + client().admin().cluster().prepareReroute().setRetryFailed(true).get(); + ensureGreen("test"); + assertThatAllShards("test", shard -> { + assertThat(shard.primaryShard().currentNodeId(), not(equalTo(unluckyNode.getId()))); + assertThat(shard.replicaShards().get(0).state(), equalTo(ShardRoutingState.STARTED)); + assertThat(shard.replicaShards().get(0).currentNodeId(), not(equalTo(unluckyNode.getId()))); + }); + final int numIterations = scaledRandomIntBetween(5, 20); for (int i = 0; i < numIterations; i++) { - SearchResponse response = client().prepareSearch().setSize(numDocs).get(); - assertHitCount(response, numDocs); + assertHitCount(client().prepareSearch().setSize(numDocs).get(), numDocs); } + } + private void assertThatAllShards(String index, Consumer verifier) { + var clusterStateResponse = client().admin().cluster().state(new ClusterStateRequest().routingTable(true)).actionGet(); + var indexRoutingTable = clusterStateResponse.getState().getRoutingTable().index(index); + for (int shardId = 0; shardId < indexRoutingTable.size(); shardId++) { + verifier.accept(indexRoutingTable.shard(shardId)); + } } /** @@ -522,7 +505,7 @@ public void testCorruptionOnNetworkLayer() throws ExecutionException, Interrupte * TODO once checksum verification on snapshotting is implemented this test needs to be fixed or split into several * parts... We should also corrupt files on the actual snapshot and check that we don't restore the corrupted shard. */ - public void testCorruptFileThenSnapshotAndRestore() throws ExecutionException, InterruptedException, IOException { + public void testCorruptFileThenSnapshotAndRestore() throws InterruptedException, IOException { int numDocs = scaledRandomIntBetween(100, 1000); internalCluster().ensureAtLeastNumDataNodes(2); @@ -546,8 +529,7 @@ public void testCorruptFileThenSnapshotAndRestore() throws ExecutionException, I ensureGreen(); assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).execute().actionGet()); // we have to flush at least once here since we don't corrupt the translog - SearchResponse countResponse = client().prepareSearch().setSize(0).get(); - assertHitCount(countResponse, numDocs); + assertHitCount(client().prepareSearch().setSize(0).get(), numDocs); ShardRouting shardRouting = corruptRandomPrimaryFile(false); logger.info("--> shard {} has a corrupted file", shardRouting); @@ -618,8 +600,7 @@ public void testReplicaCorruption() throws Exception { ensureGreen(); assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).execute().actionGet()); // we have to flush at least once here since we don't corrupt the translog - SearchResponse countResponse = client().prepareSearch().setSize(0).get(); - assertHitCount(countResponse, numDocs); + assertHitCount(client().prepareSearch().setSize(0).get(), numDocs); // disable allocations of replicas post restart (the restart will change replicas to primaries, so we have // to capture replicas post restart) @@ -770,4 +751,16 @@ public List listShardFiles(ShardRouting routing) throws IOException { } return files; } + + private List getShuffledDataNodes() { + var response = client().admin().cluster().prepareNodesStats().get(); + return response.getNodes() + .stream() + .map(BaseNodeResponse::getNode) + .filter(DiscoveryNode::canContainData) + .collect(collectingAndThen(toCollection(ArrayList::new), list -> { + Collections.shuffle(list, random()); + return list; + })); + } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 09ccd4475d9e1..3e45394e70e24 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -1475,6 +1475,14 @@ protected static ClusterAdminClient clusterAdmin() { return admin().cluster(); } + public void indexRandom(boolean forceRefresh, String index, int numDocs) throws InterruptedException { + IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs]; + for (int i = 0; i < builders.length; i++) { + builders[i] = client().prepareIndex(index).setSource("field", "value"); + } + indexRandom(forceRefresh, Arrays.asList(builders)); + } + /** * Convenience method that forwards to {@link #indexRandom(boolean, List)}. */ diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index b81d0da64a095..a21455da86902 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -2234,8 +2234,7 @@ public String startDataOnlyNode() { } public String startDataOnlyNode(Settings settings) { - Settings settings1 = Settings.builder().put(settings).put(dataOnlyNode(settings)).build(); - return startNode(settings1); + return startNode(Settings.builder().put(settings).put(dataOnlyNode(settings)).build()); } private synchronized void publishNode(NodeAndClient nodeAndClient) {