From 718faa965ce24a631a13ea1d86b609a2f197a34b Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Mon, 30 Oct 2017 17:24:47 -0400 Subject: [PATCH 1/3] Fix snapshot getting stuck in INIT state If the master disconnects from the cluster after initiating snapshot, but just before the snapshot switches from INIT to STARTED state, the snapshot can get indefinitely stuck in the INIT state. This error is specific to v5.x+ and was triggered by keeping the master node that stepped down in the node list, the cleanup logic in snapshot/restore assumed that if master steps down it is always removed from the the node list. This commit changes the logic to trigger cleanup even if no nodes left the cluster. Closes #27180 --- .../snapshots/SnapshotsService.java | 3 +- .../discovery/SnapshotDisruptionIT.java | 160 ++++++++++++++++++ 2 files changed, 162 insertions(+), 1 deletion(-) create mode 100644 core/src/test/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 037db4d5caf66..5985d6ef13139 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -628,7 +628,8 @@ private SnapshotShardFailure findShardFailure(List shardFa public void applyClusterState(ClusterChangedEvent event) { try { if (event.localNodeMaster()) { - if (event.nodesRemoved()) { + // We don't remove old master when master flips anymore. So, we need to check for change in master + if (event.nodesRemoved() || event.previousState().nodes().isLocalNodeElectedMaster() == false) { processSnapshotsOnRemovedNodes(event); } if (event.routingTableChanged()) { diff --git a/core/src/test/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java b/core/src/test/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java new file mode 100644 index 0000000000000..5cf581537d2fb --- /dev/null +++ b/core/src/test/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java @@ -0,0 +1,160 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.discovery; + +import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.SnapshotsInProgress; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.discovery.AbstractDisruptionTestCase; +import org.elasticsearch.discovery.DiscoverySettings; +import org.elasticsearch.snapshots.SnapshotInfo; +import org.elasticsearch.snapshots.SnapshotMissingException; +import org.elasticsearch.snapshots.SnapshotState; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.test.disruption.NetworkDisruption; +import org.elasticsearch.test.junit.annotations.TestLogging; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; + +/** + * Tests snapshot operations during disruptions. + */ +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0, autoMinMasterNodes = false) +//@TestLogging("_root:DEBUG,org.elasticsearch.cluster.service:TRACE") +@TestLogging("org.elasticsearch.snapshot:TRACE") +public class SnapshotDisruptionIT extends AbstractDisruptionTestCase { + + public void testIndicesDeleted() throws Exception { + final Settings settings = Settings.builder() + .put(DEFAULT_SETTINGS) + .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0s") // don't wait on isolated data node + .put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") // wait till cluster state is committed + .build(); + final String idxName = "test"; + configureCluster(settings, 4, null, 2); + final List allMasterEligibleNodes = internalCluster().startMasterOnlyNodes(3); + final String dataNode = internalCluster().startDataOnlyNode(); + ensureStableCluster(4); + + createRandomIndex(idxName); + + logger.info("--> creating repository"); + assertAcked(client().admin().cluster().preparePutRepository("test-repo") + .setType("fs").setSettings(Settings.builder() + .put("location", randomRepoPath()) + .put("compress", randomBoolean()) + .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES))); + + final String masterNode1 = internalCluster().getMasterName(); + Set otherNodes = new HashSet<>(); + otherNodes.addAll(allMasterEligibleNodes); + otherNodes.remove(masterNode1); + otherNodes.add(dataNode); + + NetworkDisruption networkDisruption = + new NetworkDisruption(new NetworkDisruption.TwoPartitions(Collections.singleton(masterNode1), otherNodes), + new NetworkDisruption.NetworkUnresponsive()); + internalCluster().setDisruptionScheme(networkDisruption); + + ClusterService clusterService = internalCluster().clusterService(masterNode1); + CountDownLatch disruptionStarted = new CountDownLatch(1); + clusterService.addListener(new ClusterStateListener() { + @Override + public void clusterChanged(ClusterChangedEvent event) { + SnapshotsInProgress snapshots = event.state().custom(SnapshotsInProgress.TYPE); + if (snapshots != null && snapshots.entries().size() > 0) { + if (snapshots.entries().get(0).state() == SnapshotsInProgress.State.INIT) { + // The snapshot started, we can start disruption so the INIT state will arrive to another master node + logger.info("--> starting disruption"); + networkDisruption.startDisrupting(); + clusterService.removeListener(this); + disruptionStarted.countDown(); + } + } + } + }); + + logger.info("--> starting snapshot"); + try { + client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false).setMasterNodeTimeout("0s").setIndices(idxName).get(); + } catch (Exception ex) { + logger.info("Got exception", ex); + } + + logger.info("--> waiting for disruption to start"); + assertTrue(disruptionStarted.await(1, TimeUnit.MINUTES)); + + logger.info("--> wait until the snapshot is done"); + + assertBusy(() -> { + SnapshotsInProgress snapshots = dataNodeClient().admin().cluster().prepareState().setLocal(true).get().getState() + .custom(SnapshotsInProgress.TYPE); + if (snapshots != null && snapshots.entries().size() > 0) { + logger.info("Current snapshot state [{}]", snapshots.entries().get(0).state()); + assertTrue(snapshots.entries().get(0).state().completed()); + } else { + logger.info("Snapshot is no longer in the cluster state"); + } + }, 1, TimeUnit.MINUTES); + + logger.info("--> verify that snapshot was successful or no longer exist"); + try { + GetSnapshotsResponse snapshotsStatusResponse = dataNodeClient().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get(); + SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0); + assertEquals(SnapshotState.SUCCESS, snapshotInfo.state()); + assertEquals(snapshotInfo.totalShards(), snapshotInfo.successfulShards()); + assertEquals(0, snapshotInfo.failedShards()); + logger.info("--> done verifying"); + } catch (SnapshotMissingException exception) { + logger.info("--> snapshot doesn't exist"); + } + + } + + private void createRandomIndex(String idxName) throws ExecutionException, InterruptedException { + assertAcked(prepareCreate(idxName, 0, Settings.builder().put("number_of_shards", between(1, 20)) + .put("number_of_replicas", 0))); + logger.info("--> indexing some data"); + final int numdocs = randomIntBetween(10, 100); + IndexRequestBuilder[] builders = new IndexRequestBuilder[numdocs]; + for (int i = 0; i < builders.length; i++) { + builders[i] = client().prepareIndex(idxName, "type1", Integer.toString(i)).setSource("field1", "bar " + i); + } + indexRandom(true, builders); + + } + +} From c022d09f818102d3eb851ea43fd5d1884a0a3baa Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Wed, 1 Nov 2017 10:59:46 -0400 Subject: [PATCH 2/3] Improve handling of clean up on the disconnected master node --- .../common/blobstore/fs/FsBlobContainer.java | 4 +- .../snapshots/SnapshotsService.java | 33 +++++++++++-- .../discovery/SnapshotDisruptionIT.java | 48 +++++++++++-------- 3 files changed, 61 insertions(+), 24 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java b/core/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java index 757cce7d8379a..1e384109aebce 100644 --- a/core/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java +++ b/core/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java @@ -140,7 +140,9 @@ public void move(String source, String target) throws IOException { Path targetPath = path.resolve(target); // If the target file exists then Files.move() behaviour is implementation specific // the existing file might be replaced or this method fails by throwing an IOException. - assert !Files.exists(targetPath); + if (Files.exists(targetPath)) { + throw new FileAlreadyExistsException("blob [" + targetPath + "] already exists, cannot overwrite"); + } Files.move(sourcePath, targetPath, StandardCopyOption.ATOMIC_MOVE); IOUtils.fsync(path, true); } diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 5985d6ef13139..ae87d52308ffb 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -425,6 +425,15 @@ public void onFailure(String source, Exception e) { removeSnapshotFromClusterState(snapshot.snapshot(), null, e, new CleanupAfterErrorListener(snapshot, true, userCreateSnapshotListener, e)); } + @Override + public void onNoLongerMaster(String source) { + // We are not longer a master - we shouldn't try to do any cleanup + // The new master will take care of it + logger.warn("[{}] failed to create snapshot - no longer a master", snapshot.snapshot().getSnapshotId()); + userCreateSnapshotListener.onFailure( + new SnapshotException(snapshot.snapshot(), "master changed during snapshot initialization")); + } + @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { // The userCreateSnapshotListener.onResponse() notifies caller that the snapshot was accepted @@ -473,6 +482,10 @@ public void onFailure(Exception e) { cleanupAfterError(e); } + public void onNoLongerMaster(String source) { + userCreateSnapshotListener.onFailure(e); + } + private void cleanupAfterError(Exception exception) { if(snapshotCreated) { try { @@ -982,7 +995,7 @@ private void removeSnapshotFromClusterState(final Snapshot snapshot, final Snaps * @param listener listener to notify when snapshot information is removed from the cluster state */ private void removeSnapshotFromClusterState(final Snapshot snapshot, final SnapshotInfo snapshotInfo, final Exception failure, - @Nullable ActionListener listener) { + @Nullable CleanupAfterErrorListener listener) { clusterService.submitStateUpdateTask("remove snapshot metadata", new ClusterStateUpdateTask() { @Override @@ -1014,6 +1027,11 @@ public void onFailure(String source, Exception e) { } } + @Override + public void onNoLongerMaster(String source) { + listener.onNoLongerMaster(source); + } + @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { for (SnapshotCompletionListener listener : snapshotCompletionListeners) { @@ -1184,9 +1202,16 @@ public void onSnapshotCompletion(Snapshot completedSnapshot, SnapshotInfo snapsh if (completedSnapshot.equals(snapshot)) { logger.debug("deleted snapshot completed - deleting files"); removeListener(this); - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> - deleteSnapshot(completedSnapshot.getRepository(), completedSnapshot.getSnapshotId().getName(), - listener, true) + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { + try { + deleteSnapshot(completedSnapshot.getRepository(), completedSnapshot.getSnapshotId().getName(), + listener, true); + + } catch (Exception ex) { + logger.warn((Supplier) () -> + new ParameterizedMessage("[{}] failed to delete snapshot", snapshot), ex); + } + } ); } } diff --git a/core/src/test/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java b/core/src/test/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java index 5cf581537d2fb..62a7ea748dc68 100644 --- a/core/src/test/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java @@ -53,14 +53,12 @@ * Tests snapshot operations during disruptions. */ @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0, autoMinMasterNodes = false) -//@TestLogging("_root:DEBUG,org.elasticsearch.cluster.service:TRACE") @TestLogging("org.elasticsearch.snapshot:TRACE") public class SnapshotDisruptionIT extends AbstractDisruptionTestCase { - public void testIndicesDeleted() throws Exception { + public void testDisruptionOnSnapshotInitialization() throws Exception { final Settings settings = Settings.builder() .put(DEFAULT_SETTINGS) - .put(DiscoverySettings.PUBLISH_TIMEOUT_SETTING.getKey(), "0s") // don't wait on isolated data node .put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") // wait till cluster state is committed .build(); final String idxName = "test"; @@ -78,6 +76,13 @@ public void testIndicesDeleted() throws Exception { .put("compress", randomBoolean()) .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES))); + // Writing incompatible snapshot can cause this test to fail due to a race condition in repo initialization + // by the current master and the former master. It is not causing any issues in real life scenario, but + // might make this test to fail. We are going to complete initialization of the snapshot to prevent this failures. + logger.info("--> initializing the repository"); + assertEquals(SnapshotState.SUCCESS, client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1") + .setWaitForCompletion(true).setIndices(idxName).get().getSnapshotInfo().state()); + final String masterNode1 = internalCluster().getMasterName(); Set otherNodes = new HashSet<>(); otherNodes.addAll(allMasterEligibleNodes); @@ -109,39 +114,46 @@ public void clusterChanged(ClusterChangedEvent event) { logger.info("--> starting snapshot"); try { - client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false).setMasterNodeTimeout("0s").setIndices(idxName).get(); + client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-2").setWaitForCompletion(false) + .setMasterNodeTimeout("5s").setIndices(idxName).get(); } catch (Exception ex) { - logger.info("Got exception", ex); + logger.info("--> got exception from hanged master", ex); } logger.info("--> waiting for disruption to start"); assertTrue(disruptionStarted.await(1, TimeUnit.MINUTES)); logger.info("--> wait until the snapshot is done"); - assertBusy(() -> { SnapshotsInProgress snapshots = dataNodeClient().admin().cluster().prepareState().setLocal(true).get().getState() .custom(SnapshotsInProgress.TYPE); if (snapshots != null && snapshots.entries().size() > 0) { logger.info("Current snapshot state [{}]", snapshots.entries().get(0).state()); - assertTrue(snapshots.entries().get(0).state().completed()); + fail("Snapshot is still running"); } else { logger.info("Snapshot is no longer in the cluster state"); } }, 1, TimeUnit.MINUTES); logger.info("--> verify that snapshot was successful or no longer exist"); - try { - GetSnapshotsResponse snapshotsStatusResponse = dataNodeClient().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get(); - SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0); - assertEquals(SnapshotState.SUCCESS, snapshotInfo.state()); - assertEquals(snapshotInfo.totalShards(), snapshotInfo.successfulShards()); - assertEquals(0, snapshotInfo.failedShards()); - logger.info("--> done verifying"); - } catch (SnapshotMissingException exception) { - logger.info("--> snapshot doesn't exist"); - } + assertBusy(() -> { + try { + GetSnapshotsResponse snapshotsStatusResponse = dataNodeClient().admin().cluster().prepareGetSnapshots("test-repo") + .setSnapshots("test-snap-2").get(); + SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0); + assertEquals(SnapshotState.SUCCESS, snapshotInfo.state()); + assertEquals(snapshotInfo.totalShards(), snapshotInfo.successfulShards()); + assertEquals(0, snapshotInfo.failedShards()); + logger.info("--> done verifying"); + } catch (SnapshotMissingException exception) { + logger.info("--> snapshot doesn't exist"); + } + }, 1, TimeUnit.MINUTES); + logger.info("--> stopping disrupting"); + networkDisruption.stopDisrupting(); + ensureStableCluster(4, masterNode1); + logger.info("--> done"); } private void createRandomIndex(String idxName) throws ExecutionException, InterruptedException { @@ -154,7 +166,5 @@ private void createRandomIndex(String idxName) throws ExecutionException, Interr builders[i] = client().prepareIndex(idxName, "type1", Integer.toString(i)).setSource("field1", "bar " + i); } indexRandom(true, builders); - } - } From b7eb257e8528ab4a6cde3b2d16f905fe6f2079b4 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Thu, 2 Nov 2017 13:59:24 -0400 Subject: [PATCH 3/3] Address @ywelsch's comments --- .../snapshots/SnapshotsService.java | 4 ++- .../discovery/SnapshotDisruptionIT.java | 29 ++++++++++--------- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index ae87d52308ffb..0804e69e46e23 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -1029,7 +1029,9 @@ public void onFailure(String source, Exception e) { @Override public void onNoLongerMaster(String source) { - listener.onNoLongerMaster(source); + if (listener != null) { + listener.onNoLongerMaster(source); + } } @Override diff --git a/core/src/test/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java b/core/src/test/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java index 62a7ea748dc68..3458cca0cf78e 100644 --- a/core/src/test/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java @@ -18,27 +18,23 @@ */ package org.elasticsearch.discovery; +import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse; import org.elasticsearch.action.index.IndexRequestBuilder; -import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.discovery.AbstractDisruptionTestCase; -import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotMissingException; import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.disruption.NetworkDisruption; import org.elasticsearch.test.junit.annotations.TestLogging; -import java.io.IOException; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -48,6 +44,7 @@ import java.util.concurrent.TimeUnit; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.instanceOf; /** * Tests snapshot operations during disruptions. @@ -81,7 +78,7 @@ public void testDisruptionOnSnapshotInitialization() throws Exception { // might make this test to fail. We are going to complete initialization of the snapshot to prevent this failures. logger.info("--> initializing the repository"); assertEquals(SnapshotState.SUCCESS, client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1") - .setWaitForCompletion(true).setIndices(idxName).get().getSnapshotInfo().state()); + .setWaitForCompletion(true).setIncludeGlobalState(true).setIndices().get().getSnapshotInfo().state()); final String masterNode1 = internalCluster().getMasterName(); Set otherNodes = new HashSet<>(); @@ -113,12 +110,8 @@ public void clusterChanged(ClusterChangedEvent event) { }); logger.info("--> starting snapshot"); - try { - client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-2").setWaitForCompletion(false) - .setMasterNodeTimeout("5s").setIndices(idxName).get(); - } catch (Exception ex) { - logger.info("--> got exception from hanged master", ex); - } + ActionFuture future = client(masterNode1).admin().cluster() + .prepareCreateSnapshot("test-repo", "test-snap-2").setWaitForCompletion(false).setIndices(idxName).execute(); logger.info("--> waiting for disruption to start"); assertTrue(disruptionStarted.await(1, TimeUnit.MINUTES)); @@ -154,6 +147,16 @@ public void clusterChanged(ClusterChangedEvent event) { networkDisruption.stopDisrupting(); ensureStableCluster(4, masterNode1); logger.info("--> done"); + + try { + future.get(); + } catch (Exception ex) { + logger.info("--> got exception from hanged master", ex); + Throwable cause = ex.getCause(); + assertThat(cause, instanceOf(MasterNotDiscoveredException.class)); + cause = cause.getCause(); + assertThat(cause, instanceOf(Discovery.FailedToCommitClusterStateException.class)); + } } private void createRandomIndex(String idxName) throws ExecutionException, InterruptedException {