From 8402ad95f2be603b7d939085723243750356f951 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 17 Dec 2019 09:45:52 +0100 Subject: [PATCH] Use ClusterState as Consistency Source for Snapshot Repositories (#49060) Follow up to #49729 This change removes falling back to listing out the repository contents to find the latest `index-N` in write-mounted blob store repositories. This saves 2-3 list operations on each snapshot create and delete operation. Also it makes all the snapshot status APIs cheaper (and faster) by saving one list operation there as well in many cases. This removes the resiliency to concurrent modifications of the repository as a result and puts a repository in a `corrupted` state in case loading `RepositoryData` failed from the assumed generation. --- .../repositories/RepositoryData.java | 5 + .../blobstore/BlobStoreRepository.java | 215 ++++++++++++--- .../BlobStoreRepositoryRestoreTests.java | 8 +- .../CorruptedBlobStoreRepositoryIT.java | 247 ++++++++++++++++++ ...ckEventuallyConsistentRepositoryTests.java | 2 + .../AbstractThirdPartyRepositoryTestCase.java | 3 +- .../blobstore/BlobStoreTestUtil.java | 10 +- .../SourceOnlySnapshotShardTests.java | 9 +- 8 files changed, 449 insertions(+), 50 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java b/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java index 357268fa051e..63e2957aacc7 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java @@ -58,6 +58,11 @@ public final class RepositoryData { */ public static final long UNKNOWN_REPO_GEN = -2L; + /** + * The generation value indicating that the repository generation could not be determined. + */ + public static final long CORRUPTED_REPO_GEN = -3L; + /** * An instance initialized for an empty repository. */ diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index b56a22e99845..b9016394ca51 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -31,6 +31,7 @@ import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.RateLimiter; import org.apache.lucene.util.SetOnce; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.StepListener; @@ -184,6 +185,14 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp */ public static final Setting COMPRESS_SETTING = Setting.boolSetting("compress", true, Setting.Property.NodeScope); + /** + * When set to {@code true}, {@link #bestEffortConsistency} will be set to {@code true} and concurrent modifications of the repository + * contents will not result in the repository being marked as corrupted. + * Note: This setting is intended as a backwards compatibility solution for 7.x and will go away in 8. + */ + public static final Setting ALLOW_CONCURRENT_MODIFICATION = + Setting.boolSetting("allow_concurrent_modifications", false, Setting.Property.Deprecated); + private final boolean compress; private final RateLimiter snapshotRateLimiter; @@ -216,6 +225,34 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp private final ClusterService clusterService; + /** + * Flag that is set to {@code true} if this instance is started with {@link #metadata} that has a higher value for + * {@link RepositoryMetaData#pendingGeneration()} than for {@link RepositoryMetaData#generation()} indicating a full cluster restart + * potentially accounting for the the last {@code index-N} write in the cluster state. + * Note: While it is true that this value could also be set to {@code true} for an instance on a node that is just joining the cluster + * during a new {@code index-N} write, this does not present a problem. The node will still load the correct {@link RepositoryData} in + * all cases and simply do a redundant listing of the repository contents if it tries to load {@link RepositoryData} and falls back + * to {@link #latestIndexBlobId()} to validate the value of {@link RepositoryMetaData#generation()}. + */ + private boolean uncleanStart; + + /** + * This flag indicates that the repository can not exclusively rely on the value stored in {@link #latestKnownRepoGen} to determine the + * latest repository generation but must inspect its physical contents as well via {@link #latestIndexBlobId()}. + * This flag is set in the following situations: + *
    + *
  • All repositories that are read-only, i.e. for which {@link #isReadOnly()} returns {@code true} because there are no + * guarantees that another cluster is not writing to the repository at the same time
  • + *
  • The node finds itself in a mixed-version cluster containing nodes older than + * {@link RepositoryMetaData#REPO_GEN_IN_CS_VERSION} where the master node does not update the value of + * {@link RepositoryMetaData#generation()} when writing a new {@code index-N} blob
  • + *
  • The value of {@link RepositoryMetaData#generation()} for this repository is {@link RepositoryData#UNKNOWN_REPO_GEN} + * indicating that no consistent repository generation is tracked in the cluster state yet.
  • + *
  • The {@link #uncleanStart} flag is set to {@code true}
  • + *
+ */ + private volatile boolean bestEffortConsistency; + /** * Constructs new BlobStoreRepository * @param metadata The metadata for this repository including name and settings @@ -249,6 +286,8 @@ protected BlobStoreRepository( @Override protected void doStart() { + uncleanStart = metadata.pendingGeneration() > RepositoryData.EMPTY_REPO_GEN && + metadata.generation() != metadata.pendingGeneration(); ByteSizeValue chunkSize = chunkSize(); if (chunkSize != null && chunkSize.getBytes() <= 0) { throw new IllegalArgumentException("the chunk size cannot be negative: [" + chunkSize + "]"); @@ -279,29 +318,42 @@ protected void doClose() { // #latestKnownRepoGen if a newer than currently known generation is found @Override public void updateState(ClusterState state) { - if (readOnly) { + metadata = getRepoMetaData(state); + uncleanStart = uncleanStart && metadata.generation() != metadata.pendingGeneration(); + bestEffortConsistency = uncleanStart || isReadOnly() + || state.nodes().getMinNodeVersion().before(RepositoryMetaData.REPO_GEN_IN_CS_VERSION) + || metadata.generation() == RepositoryData.UNKNOWN_REPO_GEN || ALLOW_CONCURRENT_MODIFICATION.get(metadata.settings()); + if (isReadOnly()) { // No need to waste cycles, no operations can run against a read-only repository return; } - long bestGenerationFromCS = RepositoryData.EMPTY_REPO_GEN; - final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE); - if (snapshotsInProgress != null) { - bestGenerationFromCS = bestGeneration(snapshotsInProgress.entries()); - } - final SnapshotDeletionsInProgress deletionsInProgress = state.custom(SnapshotDeletionsInProgress.TYPE); - // Don't use generation from the delete task if we already found a generation for an in progress snapshot. - // In this case, the generation points at the generation the repo will be in after the snapshot finishes so it may not yet exist - if (bestGenerationFromCS == RepositoryData.EMPTY_REPO_GEN && deletionsInProgress != null) { - bestGenerationFromCS = bestGeneration(deletionsInProgress.getEntries()); - } - final RepositoryCleanupInProgress cleanupInProgress = state.custom(RepositoryCleanupInProgress.TYPE); - if (bestGenerationFromCS == RepositoryData.EMPTY_REPO_GEN && cleanupInProgress != null) { - bestGenerationFromCS = bestGeneration(cleanupInProgress.entries()); + if (bestEffortConsistency) { + long bestGenerationFromCS = RepositoryData.EMPTY_REPO_GEN; + final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE); + if (snapshotsInProgress != null) { + bestGenerationFromCS = bestGeneration(snapshotsInProgress.entries()); + } + final SnapshotDeletionsInProgress deletionsInProgress = state.custom(SnapshotDeletionsInProgress.TYPE); + // Don't use generation from the delete task if we already found a generation for an in progress snapshot. + // In this case, the generation points at the generation the repo will be in after the snapshot finishes so it may not yet + // exist + if (bestGenerationFromCS == RepositoryData.EMPTY_REPO_GEN && deletionsInProgress != null) { + bestGenerationFromCS = bestGeneration(deletionsInProgress.getEntries()); + } + final RepositoryCleanupInProgress cleanupInProgress = state.custom(RepositoryCleanupInProgress.TYPE); + if (bestGenerationFromCS == RepositoryData.EMPTY_REPO_GEN && cleanupInProgress != null) { + bestGenerationFromCS = bestGeneration(cleanupInProgress.entries()); + } + final long finalBestGen = Math.max(bestGenerationFromCS, metadata.generation()); + latestKnownRepoGen.updateAndGet(known -> Math.max(known, finalBestGen)); + } else { + final long previousBest = latestKnownRepoGen.getAndSet(metadata.generation()); + if (previousBest != metadata.generation()) { + assert metadata.generation() == RepositoryData.CORRUPTED_REPO_GEN || previousBest < metadata.generation() : + "Illegal move from repository generation [" + previousBest + "] to generation [" + metadata.generation() + "]"; + logger.debug("Updated repository generation from [{}] to [{}]", previousBest, metadata.generation()); + } } - - metadata = getRepoMetaData(state); - final long finalBestGen = Math.max(bestGenerationFromCS, metadata.generation()); - latestKnownRepoGen.updateAndGet(known -> Math.max(known, finalBestGen)); } private long bestGeneration(Collection operations) { @@ -446,7 +498,12 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolea */ private RepositoryData safeRepositoryData(long repositoryStateId, Map rootBlobs) { final long generation = latestGeneration(rootBlobs.keySet()); - final long genToLoad = latestKnownRepoGen.updateAndGet(known -> Math.max(known, repositoryStateId)); + final long genToLoad; + if (bestEffortConsistency) { + genToLoad = latestKnownRepoGen.updateAndGet(known -> Math.max(known, repositoryStateId)); + } else { + genToLoad = latestKnownRepoGen.get(); + } if (genToLoad > generation) { // It's always a possibility to not see the latest index-N in the listing here on an eventually consistent blob store, just // debug log it. Any blobs leaked as a result of an inconsistent listing here will be cleaned up in a subsequent cleanup or @@ -983,36 +1040,106 @@ public void endVerification(String seed) { // Tracks the latest known repository generation in a best-effort way to detect inconsistent listing of root level index-N blobs // and concurrent modifications. - private final AtomicLong latestKnownRepoGen = new AtomicLong(RepositoryData.EMPTY_REPO_GEN); + private final AtomicLong latestKnownRepoGen = new AtomicLong(RepositoryData.UNKNOWN_REPO_GEN); @Override public void getRepositoryData(ActionListener listener) { - ActionListener.completeWith(listener, () -> { - // Retry loading RepositoryData in a loop in case we run into concurrent modifications of the repository. - while (true) { + if (latestKnownRepoGen.get() == RepositoryData.CORRUPTED_REPO_GEN) { + listener.onFailure(corruptedStateException(null)); + return; + } + // Retry loading RepositoryData in a loop in case we run into concurrent modifications of the repository. + while (true) { + final long genToLoad; + if (bestEffortConsistency) { + // We're only using #latestKnownRepoGen as a hint in this mode and listing repo contents as a secondary way of trying + // to find a higher generation final long generation; try { generation = latestIndexBlobId(); } catch (IOException ioe) { throw new RepositoryException(metadata.name(), "Could not determine repository generation from root blobs", ioe); } - final long genToLoad = latestKnownRepoGen.updateAndGet(known -> Math.max(known, generation)); + genToLoad = latestKnownRepoGen.updateAndGet(known -> Math.max(known, generation)); if (genToLoad > generation) { logger.info("Determined repository generation [" + generation + "] from repository contents but correct generation must be at least [" + genToLoad + "]"); } - try { - return getRepositoryData(genToLoad); - } catch (RepositoryException e) { - if (genToLoad != latestKnownRepoGen.get()) { - logger.warn("Failed to load repository data generation [" + genToLoad + - "] because a concurrent operation moved the current generation to [" + latestKnownRepoGen.get() + "]", e); - continue; - } + } else { + // We only rely on the generation tracked in #latestKnownRepoGen which is exclusively updated from the cluster state + genToLoad = latestKnownRepoGen.get(); + } + try { + listener.onResponse(getRepositoryData(genToLoad)); + return; + } catch (RepositoryException e) { + if (genToLoad != latestKnownRepoGen.get()) { + logger.warn("Failed to load repository data generation [" + genToLoad + + "] because a concurrent operation moved the current generation to [" + latestKnownRepoGen.get() + "]", e); + continue; + } + if (bestEffortConsistency == false && ExceptionsHelper.unwrap(e, NoSuchFileException.class) != null) { + // We did not find the expected index-N even though the cluster state continues to point at the missing value + // of N so we mark this repository as corrupted. + markRepoCorrupted(genToLoad, e, + ActionListener.wrap(v -> listener.onFailure(corruptedStateException(e)), listener::onFailure)); + return; + } else { throw e; } } - }); + } + } + + private RepositoryException corruptedStateException(@Nullable Exception cause) { + return new RepositoryException(metadata.name(), + "Could not read repository data because the contents of the repository do not match its " + + "expected state. This is likely the result of either concurrently modifying the contents of the " + + "repository by a process other than this cluster or an issue with the repository's underlying" + + "storage. The repository has been disabled to prevent corrupting its contents. To re-enable it " + + "and continue using it please remove the repository from the cluster and add it again to make " + + "the cluster recover the known state of the repository from its physical contents.", cause); + } + + /** + * Marks the repository as corrupted. This puts the repository in a state where its tracked value for + * {@link RepositoryMetaData#pendingGeneration()} is unchanged while its value for {@link RepositoryMetaData#generation()} is set to + * {@link RepositoryData#CORRUPTED_REPO_GEN}. In this state, the repository can not be used any longer and must be removed and + * recreated after the problem that lead to it being marked as corrupted has been fixed. + * + * @param corruptedGeneration generation that failed to load because the index file was not found but that should have loaded + * @param originalException exception that lead to the failing to load the {@code index-N} blob + * @param listener listener to invoke once done + */ + private void markRepoCorrupted(long corruptedGeneration, Exception originalException, ActionListener listener) { + assert corruptedGeneration != RepositoryData.UNKNOWN_REPO_GEN; + assert bestEffortConsistency == false; + clusterService.submitStateUpdateTask("mark repository corrupted [" + metadata.name() + "][" + corruptedGeneration + "]", + new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + final RepositoriesMetaData state = currentState.metaData().custom(RepositoriesMetaData.TYPE); + final RepositoryMetaData repoState = state.repository(metadata.name()); + if (repoState.generation() != corruptedGeneration) { + throw new IllegalStateException("Tried to mark repo generation [" + corruptedGeneration + + "] as corrupted but its state concurrently changed to [" + repoState + "]"); + } + return ClusterState.builder(currentState).metaData(MetaData.builder(currentState.metaData()).putCustom( + RepositoriesMetaData.TYPE, state.withUpdatedGeneration( + metadata.name(), RepositoryData.CORRUPTED_REPO_GEN, repoState.pendingGeneration())).build()).build(); + } + + @Override + public void onFailure(String source, Exception e) { + listener.onFailure(new RepositoryException(metadata.name(), "Failed marking repository state as corrupted", + ExceptionsHelper.useOrSuppress(e, originalException))); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + listener.onResponse(null); + } + }); } private RepositoryData getRepositoryData(long indexGen) { @@ -1029,11 +1156,13 @@ private RepositoryData getRepositoryData(long indexGen) { return RepositoryData.snapshotsFromXContent(parser, indexGen); } } catch (IOException ioe) { - // If we fail to load the generation we tracked in latestKnownRepoGen we reset it. - // This is done as a fail-safe in case a user manually deletes the contents of the repository in which case subsequent - // operations must start from the EMPTY_REPO_GEN again - if (latestKnownRepoGen.compareAndSet(indexGen, RepositoryData.EMPTY_REPO_GEN)) { - logger.warn("Resetting repository generation tracker because we failed to read generation [" + indexGen + "]", ioe); + if (bestEffortConsistency) { + // If we fail to load the generation we tracked in latestKnownRepoGen we reset it. + // This is done as a fail-safe in case a user manually deletes the contents of the repository in which case subsequent + // operations must start from the EMPTY_REPO_GEN again + if (latestKnownRepoGen.compareAndSet(indexGen, RepositoryData.EMPTY_REPO_GEN)) { + logger.warn("Resetting repository generation tracker because we failed to read generation [" + indexGen + "]", ioe); + } } throw new RepositoryException(metadata.name(), "could not read repository data from index blob", ioe); } @@ -1085,13 +1214,12 @@ public ClusterState execute(ClusterState currentState) { final RepositoryMetaData meta = getRepoMetaData(currentState); final String repoName = metadata.name(); final long genInState = meta.generation(); - // TODO: Remove all usages of this variable, instead initialize the generation when loading RepositoryData - final boolean uninitializedMeta = meta.generation() == RepositoryData.UNKNOWN_REPO_GEN; + final boolean uninitializedMeta = meta.generation() == RepositoryData.UNKNOWN_REPO_GEN || bestEffortConsistency; if (uninitializedMeta == false && meta.pendingGeneration() != genInState) { logger.info("Trying to write new repository data over unfinished write, repo [{}] is at " + "safe generation [{}] and pending generation [{}]", meta.name(), genInState, meta.pendingGeneration()); } - assert expectedGen == RepositoryData.EMPTY_REPO_GEN || RepositoryData.UNKNOWN_REPO_GEN == meta.generation() + assert expectedGen == RepositoryData.EMPTY_REPO_GEN || uninitializedMeta || expectedGen == meta.generation() : "Expected non-empty generation [" + expectedGen + "] does not match generation tracked in [" + meta + "]"; // If we run into the empty repo generation for the expected gen, the repo is assumed to have been cleared of @@ -1102,7 +1230,8 @@ public ClusterState execute(ClusterState currentState) { // even if a repository has been manually cleared of all contents we will never reuse the same repository generation. // This is motivated by the consistency behavior the S3 based blob repository implementation has to support which does // not offer any consistency guarantees when it comes to overwriting the same blob name with different content. - newGen = uninitializedMeta ? expectedGen + 1: metadata.pendingGeneration() + 1; + final long nextPendingGen = metadata.pendingGeneration() + 1; + newGen = uninitializedMeta ? Math.max(expectedGen + 1, nextPendingGen) : nextPendingGen; assert newGen > latestKnownRepoGen.get() : "Attempted new generation [" + newGen + "] must be larger than latest known generation [" + latestKnownRepoGen.get() + "]"; return ClusterState.builder(currentState).metaData(MetaData.builder(currentState.getMetaData()) diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java index 432091b81e1e..40e17a81be40 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingHelper; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.internal.io.IOUtils; @@ -193,13 +194,16 @@ public void testSnapshotWithConflictingName() throws IOException { private Repository createRepository() { Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build(); RepositoryMetaData repositoryMetaData = new RepositoryMetaData(randomAlphaOfLength(10), FsRepository.TYPE, settings); - final FsRepository repository = new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(), - BlobStoreTestUtil.mockClusterService(repositoryMetaData)) { + final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(repositoryMetaData); + final FsRepository repository = new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(), clusterService) { @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we create repo manually } }; + clusterService.addStateApplier(event -> repository.updateState(event.state())); + // Apply state once to initialize repo properly like RepositoriesService would + repository.updateState(clusterService.state()); repository.start(); return repository; } diff --git a/server/src/test/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java b/server/src/test/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java new file mode 100644 index 000000000000..2a860eda3f97 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java @@ -0,0 +1,247 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.snapshots; + +import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.metadata.RepositoriesMetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.repositories.Repository; +import org.elasticsearch.repositories.RepositoryData; +import org.elasticsearch.repositories.RepositoryException; +import org.elasticsearch.repositories.blobstore.BlobStoreRepository; + +import java.nio.file.Files; +import java.nio.file.Path; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; + +public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCase { + + public void testConcurrentlyChangeRepositoryContents() throws Exception { + Client client = client(); + + Path repo = randomRepoPath(); + final String repoName = "test-repo"; + logger.info("--> creating repository at {}", repo.toAbsolutePath()); + assertAcked(client.admin().cluster().preparePutRepository(repoName) + .setType("fs").setSettings(Settings.builder() + .put("location", repo) + .put("compress", false) + .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES))); + + createIndex("test-idx-1", "test-idx-2"); + logger.info("--> indexing some data"); + indexRandom(true, + client().prepareIndex("test-idx-1").setSource("foo", "bar"), + client().prepareIndex("test-idx-2").setSource("foo", "bar")); + + final String snapshot = "test-snap"; + + logger.info("--> creating snapshot"); + CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot(repoName, snapshot) + .setWaitForCompletion(true).setIndices("test-idx-*").get(); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), + equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); + + logger.info("--> move index-N blob to next generation"); + final RepositoryData repositoryData = + getRepositoryData(internalCluster().getMasterNodeInstance(RepositoriesService.class).repository(repoName)); + Files.move(repo.resolve("index-" + repositoryData.getGenId()), repo.resolve("index-" + (repositoryData.getGenId() + 1))); + + assertRepositoryBlocked(client, repoName, snapshot); + + if (randomBoolean()) { + logger.info("--> move index-N blob back to initial generation"); + Files.move(repo.resolve("index-" + (repositoryData.getGenId() + 1)), repo.resolve("index-" + repositoryData.getGenId())); + + logger.info("--> verify repository remains blocked"); + assertRepositoryBlocked(client, repoName, snapshot); + } + + logger.info("--> remove repository"); + assertAcked(client.admin().cluster().prepareDeleteRepository(repoName)); + + logger.info("--> recreate repository"); + assertAcked(client.admin().cluster().preparePutRepository(repoName) + .setType("fs").setSettings(Settings.builder() + .put("location", repo) + .put("compress", false) + .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES))); + + logger.info("--> delete snapshot"); + client.admin().cluster().prepareDeleteSnapshot(repoName, snapshot).get(); + + logger.info("--> make sure snapshot doesn't exist"); + expectThrows(SnapshotMissingException.class, () -> client.admin().cluster().prepareGetSnapshots(repoName) + .addSnapshots(snapshot).get().getSnapshots(repoName)); + } + + public void testConcurrentlyChangeRepositoryContentsInBwCMode() throws Exception { + Client client = client(); + + Path repo = randomRepoPath(); + final String repoName = "test-repo"; + logger.info("--> creating repository at {}", repo.toAbsolutePath()); + assertAcked(client.admin().cluster().preparePutRepository(repoName) + .setType("fs").setSettings(Settings.builder() + .put("location", repo) + .put("compress", false) + .put(BlobStoreRepository.ALLOW_CONCURRENT_MODIFICATION.getKey(), true) + .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES))); + + createIndex("test-idx-1", "test-idx-2"); + logger.info("--> indexing some data"); + indexRandom(true, + client().prepareIndex("test-idx-1").setSource("foo", "bar"), + client().prepareIndex("test-idx-2").setSource("foo", "bar")); + + final String snapshot = "test-snap"; + + logger.info("--> creating snapshot"); + CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot(repoName, snapshot) + .setWaitForCompletion(true).setIndices("test-idx-*").get(); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), + equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); + + final Repository repository = internalCluster().getMasterNodeInstance(RepositoriesService.class).repository(repoName); + + logger.info("--> move index-N blob to next generation"); + final RepositoryData repositoryData = getRepositoryData(repository); + final long beforeMoveGen = repositoryData.getGenId(); + Files.move(repo.resolve("index-" + beforeMoveGen), repo.resolve("index-" + (beforeMoveGen + 1))); + + logger.info("--> verify index-N blob is found at the new location"); + assertThat(getRepositoryData(repository).getGenId(), is(beforeMoveGen + 1)); + + logger.info("--> delete snapshot"); + client.admin().cluster().prepareDeleteSnapshot(repoName, snapshot).get(); + + logger.info("--> verify index-N blob is found at the expected location"); + assertThat(getRepositoryData(repository).getGenId(), is(beforeMoveGen + 2)); + + logger.info("--> make sure snapshot doesn't exist"); + expectThrows(SnapshotMissingException.class, () -> client.admin().cluster().prepareGetSnapshots(repoName) + .addSnapshots(snapshot).get().getSnapshots(repoName)); + } + + public void testFindDanglingLatestGeneration() throws Exception { + Path repo = randomRepoPath(); + final String repoName = "test-repo"; + logger.info("--> creating repository at {}", repo.toAbsolutePath()); + assertAcked(client().admin().cluster().preparePutRepository(repoName) + .setType("fs").setSettings(Settings.builder() + .put("location", repo) + .put("compress", false) + .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES))); + + createIndex("test-idx-1", "test-idx-2"); + logger.info("--> indexing some data"); + indexRandom(true, + client().prepareIndex("test-idx-1").setSource("foo", "bar"), + client().prepareIndex("test-idx-2").setSource("foo", "bar")); + + final String snapshot = "test-snap"; + + logger.info("--> creating snapshot"); + CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot(repoName, snapshot) + .setWaitForCompletion(true).setIndices("test-idx-*").get(); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), + equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); + + final Repository repository = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName); + + logger.info("--> move index-N blob to next generation"); + final RepositoryData repositoryData = getRepositoryData(repository); + final long beforeMoveGen = repositoryData.getGenId(); + Files.move(repo.resolve("index-" + beforeMoveGen), repo.resolve("index-" + (beforeMoveGen + 1))); + + logger.info("--> set next generation as pending in the cluster state"); + final PlainActionFuture csUpdateFuture = PlainActionFuture.newFuture(); + internalCluster().getCurrentMasterNodeInstance(ClusterService.class).submitStateUpdateTask("set pending generation", + new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + return ClusterState.builder(currentState).metaData(MetaData.builder(currentState.getMetaData()) + .putCustom(RepositoriesMetaData.TYPE, + currentState.metaData().custom(RepositoriesMetaData.TYPE).withUpdatedGeneration( + repository.getMetadata().name(), beforeMoveGen, beforeMoveGen + 1)).build()).build(); + } + + @Override + public void onFailure(String source, Exception e) { + csUpdateFuture.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + csUpdateFuture.onResponse(null); + } + } + ); + csUpdateFuture.get(); + + logger.info("--> full cluster restart"); + internalCluster().fullRestart(); + ensureGreen(); + + Repository repositoryAfterRestart = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName); + + logger.info("--> verify index-N blob is found at the new location"); + assertThat(getRepositoryData(repositoryAfterRestart).getGenId(), is(beforeMoveGen + 1)); + + logger.info("--> delete snapshot"); + client().admin().cluster().prepareDeleteSnapshot(repoName, snapshot).get(); + + logger.info("--> verify index-N blob is found at the expected location"); + assertThat(getRepositoryData(repositoryAfterRestart).getGenId(), is(beforeMoveGen + 2)); + + logger.info("--> make sure snapshot doesn't exist"); + expectThrows(SnapshotMissingException.class, () -> client().admin().cluster().prepareGetSnapshots(repoName) + .addSnapshots(snapshot).get().getSnapshots(repoName)); + } + + private void assertRepositoryBlocked(Client client, String repo, String existingSnapshot) { + logger.info("--> try to delete snapshot"); + final RepositoryException repositoryException3 = expectThrows(RepositoryException.class, + () -> client.admin().cluster().prepareDeleteSnapshot(repo, existingSnapshot).execute().actionGet()); + assertThat(repositoryException3.getMessage(), + containsString("Could not read repository data because the contents of the repository do not match its expected state.")); + + logger.info("--> try to create snapshot"); + final RepositoryException repositoryException4 = expectThrows(RepositoryException.class, + () -> client.admin().cluster().prepareCreateSnapshot(repo, existingSnapshot).execute().actionGet()); + assertThat(repositoryException4.getMessage(), + containsString("Could not read repository data because the contents of the repository do not match its expected state.")); + } +} diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java index 47e2626a3b7f..7443eaded77a 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java @@ -140,6 +140,8 @@ public void testOverwriteSnapshotInfoBlob() { try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(metaData, xContentRegistry(), clusterService, blobStoreContext, random())) { clusterService.addStateApplier(event -> repository.updateState(event.state())); + // Apply state once to initialize repo properly like RepositoriesService would + repository.updateState(clusterService.state()); repository.start(); // We create a snap- blob for snapshot "foo" in the first generation diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java index c6e76ac7174e..efc0e653edf5 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java @@ -76,6 +76,7 @@ public void setUp() throws Exception { @Override public void tearDown() throws Exception { deleteAndAssertEmpty(getRepository().basePath()); + client().admin().cluster().prepareDeleteRepository("test-repo").get(); super.tearDown(); } @@ -169,8 +170,6 @@ protected void assertBlobsByPrefix(BlobPath path, String prefix, Map currentState = new AtomicReference<>(initialState); + // Setting local node as master so it may update the repository metadata in the cluster state + final DiscoveryNode localNode = new DiscoveryNode("", buildNewFakeTransportAddress(), Version.CURRENT); + final AtomicReference currentState = new AtomicReference<>( + ClusterState.builder(initialState).nodes( + DiscoveryNodes.builder().add(localNode).masterNodeId(localNode.getId()).localNodeId(localNode.getId()).build()).build()); when(clusterService.state()).then(invocationOnMock -> currentState.get()); final List appliers = new CopyOnWriteArrayList<>(); doAnswer(invocation -> { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java index dba66e0b1b1d..bf6512cc531e 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.lucene.uid.Versions; @@ -352,8 +353,12 @@ private Environment createEnvironment() { private Repository createRepository() { Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build(); RepositoryMetaData repositoryMetaData = new RepositoryMetaData(randomAlphaOfLength(10), FsRepository.TYPE, settings); - return new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(), - BlobStoreTestUtil.mockClusterService(repositoryMetaData)); + final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(repositoryMetaData); + final Repository repository = new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(), clusterService); + clusterService.addStateApplier(e -> repository.updateState(e.state())); + // Apply state once to initialize repo properly like RepositoriesService would + repository.updateState(clusterService.state()); + return repository; } private static void runAsSnapshot(ThreadPool pool, Runnable runnable) {