diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index bfe6d550ebd1b..268da927f5a80 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -342,15 +342,15 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) { for (final Map.Entry shardEntry : entry.getValue().entrySet()) { final ShardId shardId = shardEntry.getKey(); - final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id()); final IndexId indexId = indicesMap.get(shardId.getIndexName()); - assert indexId != null; executor.execute(new AbstractRunnable() { final SetOnce failure = new SetOnce<>(); @Override public void doRun() { + final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id()); + assert indexId != null; snapshot(indexShard, snapshot, indexId, shardEntry.getValue()); } diff --git a/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index bb9c408a3423a..bca7a7967cdfe 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -81,6 +81,8 @@ import org.elasticsearch.test.ESIntegTestCase.Scope; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.TestCustomMetaData; +import org.elasticsearch.test.disruption.BusyMasterServiceDisruption; +import org.elasticsearch.test.disruption.ServiceDisruptionScheme; import org.elasticsearch.test.rest.FakeRestRequest; import java.io.IOException; @@ -1123,6 +1125,50 @@ public void testSnapshotTotalAndIncrementalSizes() throws IOException { assertThat(anotherStats.getTotalSize(), is(snapshot1FileSize)); } + public void testDataNodeRestartWithBusyMasterDuringSnapshot() throws Exception { + logger.info("--> starting a master node and two data nodes"); + internalCluster().startMasterOnlyNode(); + internalCluster().startDataOnlyNodes(2); + logger.info("--> creating repository"); + assertAcked(client().admin().cluster().preparePutRepository("test-repo") + .setType("mock").setSettings(Settings.builder() + .put("location", randomRepoPath()) + .put("compress", randomBoolean()) + .put("max_snapshot_bytes_per_sec", "1000b") + .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES))); + assertAcked(prepareCreate("test-idx", 0, Settings.builder() + .put("number_of_shards", 5).put("number_of_replicas", 0))); + ensureGreen(); + logger.info("--> indexing some data"); + final int numdocs = randomIntBetween(50, 100); + IndexRequestBuilder[] builders = new IndexRequestBuilder[numdocs]; + for (int i = 0; i < builders.length; i++) { + builders[i] = client().prepareIndex("test-idx", "type1", + Integer.toString(i)).setSource("field1", "bar " + i); + } + indexRandom(true, builders); + flushAndRefresh(); + final String dataNode = blockNodeWithIndex("test-repo", "test-idx"); + logger.info("--> snapshot"); + client(internalCluster().getMasterName()).admin().cluster() + .prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false).setIndices("test-idx").get(); + ServiceDisruptionScheme disruption = new BusyMasterServiceDisruption(random(), Priority.HIGH); + setDisruptionScheme(disruption); + disruption.startDisrupting(); + logger.info("--> restarting data node, which should cause primary shards to be failed"); + internalCluster().restartNode(dataNode, InternalTestCluster.EMPTY_CALLBACK); + unblockNode("test-repo", dataNode); + disruption.stopDisrupting(); + // check that snapshot completes + assertBusy(() -> { + GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster() + .prepareGetSnapshots("test-repo").setSnapshots("test-snap").setIgnoreUnavailable(true).get(); + assertEquals(1, snapshotsStatusResponse.getSnapshots().size()); + SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0); + assertTrue(snapshotInfo.state().toString(), snapshotInfo.state().completed()); + }, 30, TimeUnit.SECONDS); + } + private long calculateTotalFilesSize(List files) { return files.stream().mapToLong(f -> { try { diff --git a/test/framework/src/main/java/org/elasticsearch/test/disruption/BusyMasterServiceDisruption.java b/test/framework/src/main/java/org/elasticsearch/test/disruption/BusyMasterServiceDisruption.java new file mode 100644 index 0000000000000..3621cba1e7992 --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/test/disruption/BusyMasterServiceDisruption.java @@ -0,0 +1,89 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.test.disruption; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.InternalTestCluster; +import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; + +public class BusyMasterServiceDisruption extends SingleNodeDisruption { + private final AtomicBoolean active = new AtomicBoolean(); + private final Priority priority; + + public BusyMasterServiceDisruption(Random random, Priority priority) { + super(random); + this.priority = priority; + } + + @Override + public void startDisrupting() { + disruptedNode = cluster.getMasterName(); + final String disruptionNodeCopy = disruptedNode; + if (disruptionNodeCopy == null) { + return; + } + ClusterService clusterService = cluster.getInstance(ClusterService.class, disruptionNodeCopy); + if (clusterService == null) { + return; + } + logger.info("making master service busy on node [{}] at priority [{}]", disruptionNodeCopy, priority); + active.set(true); + submitTask(clusterService); + } + + private void submitTask(ClusterService clusterService) { + clusterService.getMasterService().submitStateUpdateTask( + "service_disruption_block", + new ClusterStateUpdateTask(priority) { + @Override + public ClusterState execute(ClusterState currentState) { + if (active.get()) { + submitTask(clusterService); + } + return currentState; + } + + @Override + public void onFailure(String source, Exception e) { + logger.error("unexpected error during disruption", e); + } + } + ); + } + + @Override + public void stopDisrupting() { + active.set(false); + } + + @Override + public void removeAndEnsureHealthy(InternalTestCluster cluster) { + removeFromCluster(cluster); + } + + @Override + public TimeValue expectedTimeToHeal() { + return TimeValue.timeValueMinutes(0); + } +}