From 9bb770e1338b43ec2278778671a93b26d3cb031c Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 17 Aug 2021 13:53:33 +0200 Subject: [PATCH] Fix Snapshot State Machine Issues around Failed Clones (#76419) (#76604) With recent fixes it is never correct to simply remove a snapshot from the cluster state without updating other snapshot entries if an entry contains any successful shards due to possible dependencies. This change reproduces two issues resulting from simply removing snapshot without regard for other queued operations and fixes them by having all removal of snapshot from the cluster state go through the same code path. Also, this change moves the tracking of a snapshot as "ending" up a few lines to fix an assertion about finishing snapshots that forces them to be in this collection. --- .../repositories/s3/S3Repository.java | 24 ++--- .../snapshots/CloneSnapshotIT.java | 65 ++++++++++++++ .../RepositoryFilterUserMetadataIT.java | 28 +----- .../repositories/FilterRepository.java | 21 +---- .../repositories/FinalizeSnapshotContext.java | 89 +++++++++++++++++++ .../repositories/Repository.java | 19 +--- .../blobstore/BlobStoreRepository.java | 36 +++----- .../elasticsearch/snapshots/SnapshotInfo.java | 5 +- .../snapshots/SnapshotsService.java | 69 +++++--------- .../RepositoriesServiceTests.java | 14 +-- .../BlobStoreRepositoryRestoreTests.java | 46 +++++----- .../index/shard/RestoreOnlyRepository.java | 10 +-- .../AbstractSnapshotIntegTestCase.java | 6 +- .../snapshots/mockstore/MockRepository.java | 24 +++-- .../xpack/ccr/repository/CcrRepository.java | 6 +- .../SourceOnlySnapshotRepository.java | 31 ++++--- .../SourceOnlySnapshotShardTests.java | 12 +-- .../encrypted/EncryptedRepository.java | 62 +++++++------ 18 files changed, 323 insertions(+), 244 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/repositories/FinalizeSnapshotContext.java diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java index d8c4e49de1ea3..b472a24d3fdad 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java @@ -13,8 +13,6 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; @@ -32,12 +30,11 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.monitor.jvm.JvmInfo; +import org.elasticsearch.repositories.FinalizeSnapshotContext; import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.RepositoryException; -import org.elasticsearch.repositories.ShardGenerations; import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository; import org.elasticsearch.snapshots.SnapshotId; -import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotsService; import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; @@ -276,15 +273,18 @@ private static Map buildLocation(RepositoryMetadata metadata) { private final AtomicReference finalizationFuture = new AtomicReference<>(); @Override - public void finalizeSnapshot(ShardGenerations shardGenerations, long repositoryStateId, Metadata clusterMetadata, - SnapshotInfo snapshotInfo, Version repositoryMetaVersion, - Function stateTransformer, - ActionListener listener) { - if (SnapshotsService.useShardGenerations(repositoryMetaVersion) == false) { - listener = delayedListener(listener); + public void finalizeSnapshot(FinalizeSnapshotContext finalizeSnapshotContext) { + if (SnapshotsService.useShardGenerations(finalizeSnapshotContext.repositoryMetaVersion()) == false) { + finalizeSnapshotContext = new FinalizeSnapshotContext( + finalizeSnapshotContext.updatedShardGenerations(), + finalizeSnapshotContext.repositoryStateId(), + finalizeSnapshotContext.clusterMetadata(), + finalizeSnapshotContext.snapshotInfo(), + finalizeSnapshotContext.repositoryMetaVersion(), + delayedListener(finalizeSnapshotContext) + ); } - super.finalizeSnapshot(shardGenerations, repositoryStateId, clusterMetadata, snapshotInfo, repositoryMetaVersion, - stateTransformer, listener); + super.finalizeSnapshot(finalizeSnapshotContext); } @Override diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java index f6cc824b70ec4..798bbc8336d1b 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java @@ -19,6 +19,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexNotFoundException; @@ -736,6 +737,66 @@ public void testManyConcurrentClonesStartOutOfOrder() throws Exception { assertAcked(clone2.get()); } + public void testRemoveFailedCloneFromCSWithoutIO() throws Exception { + final String masterNode = internalCluster().startMasterOnlyNode(); + internalCluster().startDataOnlyNode(); + final String repoName = "test-repo"; + createRepository(repoName, "mock"); + final String testIndex = "index-test"; + createIndexWithContent(testIndex); + + final String sourceSnapshot = "source-snapshot"; + createFullSnapshot(repoName, sourceSnapshot); + + final String targetSnapshot = "target-snapshot"; + blockAndFailMasterOnShardClone(repoName); + final ActionFuture cloneFuture = startClone(repoName, sourceSnapshot, targetSnapshot, testIndex); + awaitNumberOfSnapshotsInProgress(1); + waitForBlock(masterNode, repoName); + unblockNode(repoName, masterNode); + expectThrows(SnapshotException.class, cloneFuture::actionGet); + awaitNoMoreRunningOperations(); + assertAllSnapshotsSuccessful(getRepositoryData(repoName), 1); + assertAcked(startDeleteSnapshot(repoName, sourceSnapshot).get()); + } + + public void testRemoveFailedCloneFromCSWithQueuedSnapshotInProgress() throws Exception { + // single threaded master snapshot pool so we can selectively fail part of a clone by letting it run shard by shard + final String masterNode = internalCluster().startMasterOnlyNode( + Settings.builder().put("thread_pool.snapshot.core", 1).put("thread_pool.snapshot.max", 1).build() + ); + final String dataNode = internalCluster().startDataOnlyNode(); + final String repoName = "test-repo"; + createRepository(repoName, "mock"); + final String testIndex = "index-test"; + final String testIndex2 = "index-test-2"; + createIndexWithContent(testIndex); + createIndexWithContent(testIndex2); + + final String sourceSnapshot = "source-snapshot"; + createFullSnapshot(repoName, sourceSnapshot); + + final String targetSnapshot = "target-snapshot"; + blockAndFailMasterOnShardClone(repoName); + + createIndexWithContent("test-index-3"); + blockDataNode(repoName, dataNode); + final ActionFuture fullSnapshotFuture1 = startFullSnapshot(repoName, "full-snapshot-1"); + waitForBlock(dataNode, repoName); + final ActionFuture cloneFuture = startClone(repoName, sourceSnapshot, targetSnapshot, testIndex, testIndex2); + awaitNumberOfSnapshotsInProgress(2); + waitForBlock(masterNode, repoName); + unblockNode(repoName, masterNode); + final ActionFuture fullSnapshotFuture2 = startFullSnapshot(repoName, "full-snapshot-2"); + expectThrows(SnapshotException.class, cloneFuture::actionGet); + unblockNode(repoName, dataNode); + awaitNoMoreRunningOperations(); + assertSuccessful(fullSnapshotFuture1); + assertSuccessful(fullSnapshotFuture2); + assertAllSnapshotsSuccessful(getRepositoryData(repoName), 3); + assertAcked(startDeleteSnapshot(repoName, sourceSnapshot).get()); + } + private ActionFuture startCloneFromDataNode( String repoName, String sourceSnapshot, @@ -772,6 +833,10 @@ private void blockMasterOnShardClone(String repoName) { AbstractSnapshotIntegTestCase.getRepositoryOnMaster(repoName).setBlockOnWriteShardLevelMeta(); } + private void blockAndFailMasterOnShardClone(String repoName) { + AbstractSnapshotIntegTestCase.getRepositoryOnMaster(repoName).setBlockAndFailOnWriteShardLevelMeta(); + } + /** * Assert that given {@link RepositoryData} contains exactly the given number of snapshots and all of them are successful. */ diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoryFilterUserMetadataIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoryFilterUserMetadataIT.java index f15f6a1149f98..6f7fcd6abf6f0 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoryFilterUserMetadataIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoryFilterUserMetadataIT.java @@ -7,10 +7,6 @@ */ package org.elasticsearch.snapshots; -import org.elasticsearch.Version; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; @@ -19,9 +15,8 @@ import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.RepositoryPlugin; +import org.elasticsearch.repositories.FinalizeSnapshotContext; import org.elasticsearch.repositories.Repository; -import org.elasticsearch.repositories.RepositoryData; -import org.elasticsearch.repositories.ShardGenerations; import org.elasticsearch.repositories.SnapshotShardContext; import org.elasticsearch.repositories.fs.FsRepository; import org.elasticsearch.test.ESIntegTestCase; @@ -29,7 +24,6 @@ import java.util.Collection; import java.util.Collections; import java.util.Map; -import java.util.function.Function; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.is; @@ -89,24 +83,8 @@ public Map getRepositories( private final String initialMetaValue = metadata.settings().get(MASTER_SETTING_VALUE); @Override - public void finalizeSnapshot( - ShardGenerations shardGenerations, - long repositoryStateId, - Metadata clusterMetadata, - SnapshotInfo snapshotInfo, - Version repositoryMetaVersion, - Function stateTransformer, - ActionListener listener - ) { - super.finalizeSnapshot( - shardGenerations, - repositoryStateId, - clusterMetadata, - snapshotInfo, - repositoryMetaVersion, - stateTransformer, - listener - ); + public void finalizeSnapshot(FinalizeSnapshotContext finalizeSnapshotContext) { + super.finalizeSnapshot(finalizeSnapshotContext); } @Override diff --git a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java index 18f807aac0552..1c8249e82f25c 100644 --- a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java @@ -23,7 +23,6 @@ import org.elasticsearch.index.store.Store; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.snapshots.SnapshotId; -import org.elasticsearch.snapshots.SnapshotInfo; import java.io.IOException; import java.util.Collection; @@ -75,24 +74,8 @@ public void initializeSnapshot(SnapshotId snapshotId, List indices, Met } @Override - public void finalizeSnapshot( - ShardGenerations shardGenerations, - long repositoryStateId, - Metadata clusterMetadata, - SnapshotInfo snapshotInfo, - Version repositoryMetaVersion, - Function stateTransformer, - ActionListener listener - ) { - in.finalizeSnapshot( - shardGenerations, - repositoryStateId, - clusterMetadata, - snapshotInfo, - repositoryMetaVersion, - stateTransformer, - listener - ); + public void finalizeSnapshot(final FinalizeSnapshotContext finalizeSnapshotContext) { + in.finalizeSnapshot(finalizeSnapshotContext); } @Override diff --git a/server/src/main/java/org/elasticsearch/repositories/FinalizeSnapshotContext.java b/server/src/main/java/org/elasticsearch/repositories/FinalizeSnapshotContext.java new file mode 100644 index 0000000000000..646a44b77a425 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/repositories/FinalizeSnapshotContext.java @@ -0,0 +1,89 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.repositories; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.snapshots.SnapshotInfo; +import org.elasticsearch.snapshots.SnapshotsService; + +/** + * Context for finalizing a snapshot. + */ +public final class FinalizeSnapshotContext extends ActionListener.Delegating< + Tuple, + Tuple> { + + private final ShardGenerations updatedShardGenerations; + + private final long repositoryStateId; + + private final Metadata clusterMetadata; + + private final SnapshotInfo snapshotInfo; + + private final Version repositoryMetaVersion; + + /** + * @param updatedShardGenerations updated shard generations + * @param repositoryStateId the unique id identifying the state of the repository when the snapshot began + * @param clusterMetadata cluster metadata + * @param snapshotInfo SnapshotInfo instance to write for this snapshot + * @param repositoryMetaVersion version of the updated repository metadata to write + * @param listener listener to be invoked with the new {@link RepositoryData} and {@link SnapshotInfo} after completing + * the snapshot + */ + public FinalizeSnapshotContext( + ShardGenerations updatedShardGenerations, + long repositoryStateId, + Metadata clusterMetadata, + SnapshotInfo snapshotInfo, + Version repositoryMetaVersion, + ActionListener> listener + ) { + super(listener); + this.updatedShardGenerations = updatedShardGenerations; + this.repositoryStateId = repositoryStateId; + this.clusterMetadata = clusterMetadata; + this.snapshotInfo = snapshotInfo; + this.repositoryMetaVersion = repositoryMetaVersion; + } + + public long repositoryStateId() { + return repositoryStateId; + } + + public ShardGenerations updatedShardGenerations() { + return updatedShardGenerations; + } + + public SnapshotInfo snapshotInfo() { + return snapshotInfo; + } + + public Version repositoryMetaVersion() { + return repositoryMetaVersion; + } + + public Metadata clusterMetadata() { + return clusterMetadata; + } + + public ClusterState updatedClusterState(ClusterState state) { + return SnapshotsService.stateWithoutSnapshot(state, snapshotInfo.snapshot()); + } + + @Override + public void onResponse(Tuple repositoryData) { + delegate.onResponse(repositoryData); + } +} diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java index a030a98602813..bbd051807cca9 100644 --- a/server/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java @@ -147,24 +147,9 @@ public void onFailure(Exception e) { *

* This method is called on master after all shards are snapshotted. * - * @param shardGenerations updated shard generations - * @param repositoryStateId the unique id identifying the state of the repository when the snapshot began - * @param clusterMetadata cluster metadata - * @param snapshotInfo SnapshotInfo instance to write for this snapshot - * @param repositoryMetaVersion version of the updated repository metadata to write - * @param stateTransformer a function that filters the last cluster state update that the snapshot finalization will execute and - * is used to remove any state tracked for the in-progress snapshot from the cluster state - * @param listener listener to be invoked with the new {@link RepositoryData} after completing the snapshot + * @param finalizeSnapshotContext finalization context */ - void finalizeSnapshot( - ShardGenerations shardGenerations, - long repositoryStateId, - Metadata clusterMetadata, - SnapshotInfo snapshotInfo, - Version repositoryMetaVersion, - Function stateTransformer, - ActionListener listener - ); + void finalizeSnapshot(FinalizeSnapshotContext finalizeSnapshotContext); /** * Deletes snapshots diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 53b034e1bd5f8..67c68284483ed 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -91,6 +91,7 @@ import org.elasticsearch.index.store.StoreFileMetadata; import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.repositories.FinalizeSnapshotContext; import org.elasticsearch.repositories.GetSnapshotInfoContext; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.IndexMetaDataGenerations; @@ -1350,15 +1351,10 @@ private DeleteResult cleanupStaleIndices(Map foundIndices } @Override - public void finalizeSnapshot( - final ShardGenerations shardGenerations, - final long repositoryStateId, - final Metadata clusterMetadata, - SnapshotInfo snapshotInfo, - Version repositoryMetaVersion, - Function stateTransformer, - final ActionListener listener - ) { + public void finalizeSnapshot(final FinalizeSnapshotContext finalizeSnapshotContext) { + final long repositoryStateId = finalizeSnapshotContext.repositoryStateId(); + final ShardGenerations shardGenerations = finalizeSnapshotContext.updatedShardGenerations(); + final SnapshotInfo snapshotInfo = finalizeSnapshotContext.snapshotInfo(); assert repositoryStateId > RepositoryData.UNKNOWN_REPO_GEN : "Must finalize based on a valid repository generation but received [" + repositoryStateId + "]"; final Collection indices = shardGenerations.indices(); @@ -1367,8 +1363,9 @@ public void finalizeSnapshot( // directory if all nodes are at least at version SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION // If there are older version nodes in the cluster, we don't need to run this cleanup as it will have already happened // when writing the index-${N} to each shard directory. + final Version repositoryMetaVersion = finalizeSnapshotContext.repositoryMetaVersion(); final boolean writeShardGens = SnapshotsService.useShardGenerations(repositoryMetaVersion); - final Consumer onUpdateFailure = e -> listener.onFailure( + final Consumer onUpdateFailure = e -> finalizeSnapshotContext.onFailure( new SnapshotException(metadata.name(), snapshotId, "failed to update snapshot in repository", e) ); @@ -1381,7 +1378,7 @@ public void finalizeSnapshot( repoDataListener.whenComplete(existingRepositoryData -> { final int existingSnapshotCount = existingRepositoryData.getSnapshotIds().size(); if (existingSnapshotCount >= maxSnapshotCount) { - listener.onFailure( + finalizeSnapshotContext.onFailure( new RepositoryException( metadata.name(), "Cannot add another snapshot to this repository as it " @@ -1412,23 +1409,16 @@ public void finalizeSnapshot( snapshotInfo.startTime(), snapshotInfo.endTime() ); - final RepositoryData updatedRepositoryData = existingRepositoryData.addSnapshot( - snapshotId, - snapshotDetails, - shardGenerations, - indexMetas, - indexMetaIdentifiers - ); writeIndexGen( - updatedRepositoryData, + existingRepositoryData.addSnapshot(snapshotId, snapshotDetails, shardGenerations, indexMetas, indexMetaIdentifiers), repositoryStateId, repositoryMetaVersion, - stateTransformer, + finalizeSnapshotContext::updatedClusterState, ActionListener.wrap(newRepoData -> { if (writeShardGens) { - cleanupOldShardGens(existingRepositoryData, updatedRepositoryData); + cleanupOldShardGens(existingRepositoryData, newRepoData); } - listener.onResponse(newRepoData); + finalizeSnapshotContext.onResponse(Tuple.tuple(newRepoData, snapshotInfo)); }, onUpdateFailure) ); }, onUpdateFailure), 2 + indices.size()); @@ -1438,7 +1428,7 @@ public void finalizeSnapshot( // index or global metadata will be compatible with the segments written in this snapshot as well. // Failing on an already existing index-${repoGeneration} below ensures that the index.latest blob is not updated in a way // that decrements the generation it points at - + final Metadata clusterMetadata = finalizeSnapshotContext.clusterMetadata(); // Write Global MetaData executor.execute( ActionRunnable.run( diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotInfo.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotInfo.java index 4add6d73fdb68..d104d2136164a 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotInfo.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotInfo.java @@ -427,7 +427,7 @@ public SnapshotInfo( ); } - SnapshotInfo( + public SnapshotInfo( Snapshot snapshot, List indices, List dataStreams, @@ -457,7 +457,8 @@ public SnapshotInfo( this.successfulShards = successfulShards; this.shardFailures = org.elasticsearch.core.List.copyOf(shardFailures); this.includeGlobalState = includeGlobalState; - this.userMetadata = userMetadata; + this.userMetadata = userMetadata == null ? null : org.elasticsearch.core.Map.copyOf(userMetadata); + ; this.indexSnapshotDetails = org.elasticsearch.core.Map.copyOf(indexSnapshotDetails); } diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 1798e4bb1bb11..922e6d2dbd167 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -73,6 +73,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.AssociatedIndexDescriptor; import org.elasticsearch.indices.SystemIndices; +import org.elasticsearch.repositories.FinalizeSnapshotContext; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; @@ -1856,7 +1857,9 @@ private void endSnapshot(SnapshotsInProgress.Entry entry, Metadata metadata, @Nu } if (entry.isClone() && entry.state() == State.FAILED) { logger.debug("Removing failed snapshot clone [{}] from cluster state", entry); - removeFailedSnapshotFromClusterState(entry.snapshot(), new SnapshotException(entry.snapshot(), entry.failure()), null, null); + if (newFinalization) { + removeFailedSnapshotFromClusterState(snapshot, new SnapshotException(snapshot, entry.failure()), null, null); + } return; } final String repoName = snapshot.getRepository(); @@ -2011,24 +2014,24 @@ private void finalizeSnapshotEntry(Snapshot snapshot, Metadata metadata, Reposit entry.partial() ? shardGenerations.totalShards() : entry.shardsByRepoShardId().size(), shardFailures, entry.includeGlobalState(), - // TODO: remove this hack making the metadata mutable once - // https://github.com/elastic/elasticsearch/pull/72776 has been merged - entry.userMetadata() == null ? null : new HashMap<>(entry.userMetadata()), + entry.userMetadata(), entry.startTime(), indexSnapshotDetails ); repo.finalizeSnapshot( - shardGenerations, - repositoryData.getGenId(), - metaForSnapshot, - snapshotInfo, - entry.version(), - state -> stateWithoutSuccessfulSnapshot(state, snapshot), - ActionListener.wrap(newRepoData -> { - completeListenersIgnoringException(endAndGetListenersToResolve(snapshot), Tuple.tuple(newRepoData, snapshotInfo)); - logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state()); - runNextQueuedOperation(newRepoData, repository, true); - }, e -> handleFinalizationFailure(e, snapshot, repositoryData)) + new FinalizeSnapshotContext( + shardGenerations, + repositoryData.getGenId(), + metaForSnapshot, + snapshotInfo, + entry.version(), + ActionListener.wrap(result -> { + final SnapshotInfo writtenSnapshotInfo = result.v2(); + completeListenersIgnoringException(endAndGetListenersToResolve(writtenSnapshotInfo.snapshot()), result); + logger.info("snapshot [{}] completed with state [{}]", snapshot, writtenSnapshotInfo.state()); + runNextQueuedOperation(result.v1(), repository, true); + }, e -> handleFinalizationFailure(e, snapshot, repositoryData)) + ) ); }, e -> handleFinalizationFailure(e, snapshot, repositoryData)); } catch (Exception e) { @@ -2218,15 +2221,15 @@ private static Tuple> read } /** - * Computes the cluster state resulting from removing a given snapshot create operation that was finalized in the repository from the - * given state. This method will update the shard generations of snapshots that the given snapshot depended on so that finalizing them - * will not cause rolling back to an outdated shard generation. + * Computes the cluster state resulting from removing a given snapshot create operation from the given state. This method will update + * the shard generations of snapshots that the given snapshot depended on so that finalizing them will not cause rolling back to an + * outdated shard generation. * * @param state current cluster state * @param snapshot snapshot for which to remove the snapshot operation * @return updated cluster state */ - private static ClusterState stateWithoutSuccessfulSnapshot(ClusterState state, Snapshot snapshot) { + public static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot snapshot) { // TODO: updating snapshots here leaks their outdated generation files, we should add logic to clean those up and enhance // BlobStoreTestUtil to catch this leak SnapshotsInProgress snapshots = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY); @@ -2381,32 +2384,6 @@ private static ImmutableOpenMap.Builder maybeAddUpda return updatedShardAssignments; } - /** - * Computes the cluster state resulting from removing a given snapshot create operation from the given state after it has failed at - * any point before being finalized in the repository. - * - * @param state current cluster state - * @param snapshot snapshot for which to remove the snapshot operation - * @return updated cluster state - */ - private static ClusterState stateWithoutFailedSnapshot(ClusterState state, Snapshot snapshot) { - SnapshotsInProgress snapshots = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY); - ClusterState result = state; - boolean changed = false; - ArrayList entries = new ArrayList<>(); - for (SnapshotsInProgress.Entry entry : snapshots.entries()) { - if (entry.snapshot().equals(snapshot)) { - changed = true; - } else { - entries.add(entry); - } - } - if (changed) { - result = ClusterState.builder(state).putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(entries)).build(); - } - return readyDeletions(result).v1(); - } - /** * Removes record of running snapshot from cluster state and notifies the listener when this action is complete. This method is only * used when the snapshot fails for some reason. During normal operation the snapshot repository will remove the @@ -2429,7 +2406,7 @@ private void removeFailedSnapshotFromClusterState( @Override public ClusterState execute(ClusterState currentState) { - final ClusterState updatedState = stateWithoutFailedSnapshot(currentState, snapshot); + final ClusterState updatedState = stateWithoutSnapshot(currentState, snapshot); assert updatedState == currentState || endingSnapshots.contains(snapshot) : "did not track [" + snapshot + "] in ending snapshots while removing it from the cluster state"; // now check if there are any delete operations that refer to the just failed snapshot and remove the snapshot from them diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java index 4e1bbf2dfd028..9b374449f6dcf 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java @@ -38,7 +38,6 @@ import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository; import org.elasticsearch.snapshots.SnapshotId; -import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transport; @@ -226,17 +225,8 @@ public void initializeSnapshot(SnapshotId snapshotId, List indices, Met } - @Override - public void finalizeSnapshot( - ShardGenerations shardGenerations, - long repositoryStateId, - Metadata clusterMetadata, - SnapshotInfo snapshotInfo, - Version repositoryMetaVersion, - Function stateTransformer, - ActionListener listener - ) { - listener.onResponse(null); + public void finalizeSnapshot(FinalizeSnapshotContext finalizeSnapshotContext) { + finalizeSnapshotContext.onResponse(null); } @Override diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java index 4341af98b2d9b..0b60ed8c2956c 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.core.Tuple; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.Environment; import org.elasticsearch.env.TestEnvironment; @@ -35,6 +36,7 @@ import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetadata; import org.elasticsearch.indices.recovery.RecoverySettings; +import org.elasticsearch.repositories.FinalizeSnapshotContext; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; @@ -50,7 +52,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.function.Function; import java.util.stream.Collectors; import static org.hamcrest.Matchers.containsString; @@ -171,28 +172,29 @@ public void testSnapshotWithConflictingName() throws Exception { new SnapshotId(snapshot.getSnapshotId().getName(), "_uuid2") ); final ShardGenerations shardGenerations = ShardGenerations.builder().put(indexId, 0, shardGen).build(); - PlainActionFuture.get( + PlainActionFuture., Exception>get( f -> repository.finalizeSnapshot( - shardGenerations, - RepositoryData.EMPTY_REPO_GEN, - Metadata.builder().put(shard.indexSettings().getIndexMetadata(), false).build(), - new SnapshotInfo( - snapshot, - shardGenerations.indices().stream().map(IndexId::getName).collect(Collectors.toList()), - Collections.emptyList(), - Collections.emptyList(), - null, - 1L, - 6, - Collections.emptyList(), - true, - Collections.emptyMap(), - 0L, - Collections.emptyMap() - ), - Version.CURRENT, - Function.identity(), - f + new FinalizeSnapshotContext( + shardGenerations, + RepositoryData.EMPTY_REPO_GEN, + Metadata.builder().put(shard.indexSettings().getIndexMetadata(), false).build(), + new SnapshotInfo( + snapshot, + shardGenerations.indices().stream().map(IndexId::getName).collect(Collectors.toList()), + Collections.emptyList(), + Collections.emptyList(), + null, + 1L, + 6, + Collections.emptyList(), + true, + Collections.emptyMap(), + 0L, + Collections.emptyMap() + ), + Version.CURRENT, + f + ) ) ); IndexShardSnapshotFailedException isfe = expectThrows( diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java index 44ffde8a8ddbc..eccb721dd8611 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java @@ -17,6 +17,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; +import org.elasticsearch.repositories.FinalizeSnapshotContext; import org.elasticsearch.repositories.GetSnapshotInfoContext; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.IndexMetaDataGenerations; @@ -27,7 +28,6 @@ import org.elasticsearch.repositories.ShardSnapshotResult; import org.elasticsearch.repositories.SnapshotShardContext; import org.elasticsearch.snapshots.SnapshotId; -import org.elasticsearch.snapshots.SnapshotInfo; import java.util.Collection; import java.util.Collections; @@ -97,12 +97,8 @@ public void getRepositoryData(ActionListener listener) { public void initializeSnapshot(SnapshotId snapshotId, List indices, Metadata metadata) { } - @Override - public void finalizeSnapshot(ShardGenerations shardGenerations, long repositoryStateId, - Metadata clusterMetadata, SnapshotInfo snapshotInfo, Version repositoryMetaVersion, - Function stateTransformer, - ActionListener listener) { - listener.onResponse(null); + public void finalizeSnapshot(FinalizeSnapshotContext finalizeSnapshotContext) { + finalizeSnapshotContext.onResponse(null); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java index c0ae87a698c36..8cf729588d0f1 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -43,8 +43,10 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.core.Tuple; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.repositories.FinalizeSnapshotContext; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; @@ -484,9 +486,9 @@ protected void addBwCFailedSnapshot(String repoName, String snapshotName, Mapget(f -> repo.finalizeSnapshot( + PlainActionFuture., Exception>get(f -> repo.finalizeSnapshot(new FinalizeSnapshotContext( ShardGenerations.EMPTY, getRepositoryData(repoName).getGenId(), state.metadata(), snapshotInfo, - SnapshotsService.OLD_SNAPSHOT_FORMAT, Function.identity(), f)); + SnapshotsService.OLD_SNAPSHOT_FORMAT, f))); } protected void awaitNDeletionsInProgress(int count) throws Exception { diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index ac6b92e69ee05..4f155297186cd 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -135,6 +135,8 @@ public long getFailureCount() { private volatile boolean blockOnWriteShardLevelMeta; + private volatile boolean blockAndFailOnWriteShardLevelMeta; + private volatile boolean blockOnReadIndexMeta; private final AtomicBoolean blockOnceOnReadSnapshotInfo = new AtomicBoolean(false); @@ -220,6 +222,7 @@ public synchronized void unblock() { blockedIndexId = null; blockOnDeleteIndexN = false; blockOnWriteShardLevelMeta = false; + blockAndFailOnWriteShardLevelMeta = false; blockOnReadIndexMeta = false; blockOnceOnReadSnapshotInfo.set(false); this.notifyAll(); @@ -262,9 +265,15 @@ public void setBlockOnDeleteIndexFile() { } public void setBlockOnWriteShardLevelMeta() { + assert blockAndFailOnWriteShardLevelMeta == false : "Either fail or wait after blocking on shard level metadata, not both"; blockOnWriteShardLevelMeta = true; } + public void setBlockAndFailOnWriteShardLevelMeta() { + assert blockOnWriteShardLevelMeta == false : "Either fail or wait after blocking on shard level metadata, not both"; + blockAndFailOnWriteShardLevelMeta = true; + } + public void setBlockOnReadIndexMeta() { blockOnReadIndexMeta = true; } @@ -295,9 +304,9 @@ private synchronized boolean blockExecution() { logger.debug("[{}] Blocking execution", metadata.name()); boolean wasBlocked = false; try { - while (blockOnDataFiles || blockOnAnyFiles || blockAndFailOnWriteIndexFile || blockOnWriteIndexFile || - blockAndFailOnWriteSnapFile || blockOnDeleteIndexN || blockOnWriteShardLevelMeta || blockOnReadIndexMeta || - blockAndFailOnDataFiles || blockedIndexId != null) { + while (blockAndFailOnDataFiles || blockOnDataFiles || blockOnAnyFiles || blockAndFailOnWriteIndexFile || blockOnWriteIndexFile + || blockAndFailOnWriteSnapFile || blockOnDeleteIndexN || blockOnWriteShardLevelMeta || blockAndFailOnWriteShardLevelMeta + || blockOnReadIndexMeta || blockedIndexId != null) { blocked = true; this.wait(); wasBlocked = true; @@ -539,9 +548,12 @@ public void writeBlob(String blobName, private void beforeWrite(String blobName) throws IOException { maybeIOExceptionOrBlock(blobName); - if (blockOnWriteShardLevelMeta && blobName.startsWith(BlobStoreRepository.SNAPSHOT_PREFIX) - && path().equals(basePath()) == false) { - blockExecutionAndMaybeWait(blobName); + if (blobName.startsWith(BlobStoreRepository.SNAPSHOT_PREFIX) && path().equals(basePath()) == false) { + if (blockOnWriteShardLevelMeta) { + blockExecutionAndMaybeWait(blobName); + } else if (blockAndFailOnWriteShardLevelMeta) { + blockExecutionAndFail(blobName); + } } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 683a2c40a17f2..6d29e3f4cc0b9 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -63,6 +63,7 @@ import org.elasticsearch.indices.recovery.MultiChunkTransfer; import org.elasticsearch.indices.recovery.MultiFileWriter; import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.repositories.FinalizeSnapshotContext; import org.elasticsearch.repositories.GetSnapshotInfoContext; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.IndexMetaDataGenerations; @@ -297,10 +298,7 @@ public void initializeSnapshot(SnapshotId snapshotId, List indices, Met } @Override - public void finalizeSnapshot(ShardGenerations shardGenerations, long repositoryStateId, Metadata metadata, - SnapshotInfo snapshotInfo, Version repositoryMetaVersion, - Function stateTransformer, - ActionListener listener) { + public void finalizeSnapshot(FinalizeSnapshotContext finalizeSnapshotContext) { throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotRepository.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotRepository.java index 6acf693170028..bd51c903f593e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotRepository.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotRepository.java @@ -19,9 +19,6 @@ import org.apache.lucene.store.FSDirectory; import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.store.SimpleFSDirectory; -import org.elasticsearch.Version; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.MappingMetadata; import org.elasticsearch.cluster.metadata.Metadata; @@ -42,13 +39,11 @@ import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.TranslogStats; import org.elasticsearch.repositories.FilterRepository; +import org.elasticsearch.repositories.FinalizeSnapshotContext; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.Repository; -import org.elasticsearch.repositories.RepositoryData; -import org.elasticsearch.repositories.ShardGenerations; import org.elasticsearch.repositories.SnapshotShardContext; import org.elasticsearch.snapshots.SnapshotId; -import org.elasticsearch.snapshots.SnapshotInfo; import java.io.Closeable; import java.io.IOException; @@ -106,18 +101,26 @@ public void initializeSnapshot(SnapshotId snapshotId, List indices, Met } @Override - public void finalizeSnapshot(ShardGenerations shardGenerations, long repositoryStateId, Metadata metadata, - SnapshotInfo snapshotInfo, Version repositoryMetaVersion, - Function stateTransformer, - ActionListener listener) { + public void finalizeSnapshot(FinalizeSnapshotContext finalizeSnapshotContext) { // we process the index metadata at snapshot time. This means if somebody tries to restore // a _source only snapshot with a plain repository it will be just fine since we already set the // required engine, that the index is read-only and the mapping to a default mapping try { - super.finalizeSnapshot(shardGenerations, repositoryStateId, metadataToSnapshot(shardGenerations.indices(), metadata), - snapshotInfo, repositoryMetaVersion, stateTransformer, listener); - } catch (IOException ex) { - listener.onFailure(ex); + super.finalizeSnapshot( + new FinalizeSnapshotContext( + finalizeSnapshotContext.updatedShardGenerations(), + finalizeSnapshotContext.repositoryStateId(), + metadataToSnapshot( + finalizeSnapshotContext.updatedShardGenerations().indices(), + finalizeSnapshotContext.clusterMetadata() + ), + finalizeSnapshotContext.snapshotInfo(), + finalizeSnapshotContext.repositoryMetaVersion(), + finalizeSnapshotContext + ) + ); + } catch (IOException e) { + finalizeSnapshotContext.onFailure(e); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotShardTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotShardTests.java index 94a2b4b75ac5f..23f19701bcb81 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotShardTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotShardTests.java @@ -41,6 +41,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.core.Tuple; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.Environment; import org.elasticsearch.env.TestEnvironment; @@ -63,12 +64,13 @@ import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.repositories.FinalizeSnapshotContext; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.ShardGenerations; -import org.elasticsearch.repositories.SnapshotShardContext; import org.elasticsearch.repositories.ShardSnapshotResult; +import org.elasticsearch.repositories.SnapshotShardContext; import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; import org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase; import org.elasticsearch.repositories.fs.FsRepository; @@ -83,7 +85,6 @@ import java.util.Collections; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; -import java.util.function.Function; import java.util.stream.Collectors; public class SourceOnlySnapshotShardTests extends IndexShardTestCase { @@ -227,10 +228,10 @@ public void testRestoreMinmal() throws IOException { repository.snapshotShard(new SnapshotShardContext(shard.store(), shard.mapperService(), snapshotId, indexId, snapshotRef, null, indexShardSnapshotStatus, Version.CURRENT, Collections.emptyMap(), future)); future.actionGet(); - final PlainActionFuture finFuture = PlainActionFuture.newFuture(); + final PlainActionFuture> finFuture = PlainActionFuture.newFuture(); final ShardGenerations shardGenerations = ShardGenerations.builder().put(indexId, 0, indexShardSnapshotStatus.generation()).build(); - repository.finalizeSnapshot( + repository.finalizeSnapshot(new FinalizeSnapshotContext( shardGenerations, ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository).getGenId(), Metadata.builder().put(shard.indexSettings().getIndexMetadata(), false).build(), @@ -248,7 +249,8 @@ public void testRestoreMinmal() throws IOException { Collections.emptyMap(), 0L, Collections.emptyMap()), - Version.CURRENT, Function.identity(), finFuture); + Version.CURRENT, finFuture + )); finFuture.actionGet(); }); IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy(); diff --git a/x-pack/plugin/repository-encrypted/src/main/java/org/elasticsearch/repositories/encrypted/EncryptedRepository.java b/x-pack/plugin/repository-encrypted/src/main/java/org/elasticsearch/repositories/encrypted/EncryptedRepository.java index eb1411bc1a2ed..49fb12059ec3a 100644 --- a/x-pack/plugin/repository-encrypted/src/main/java/org/elasticsearch/repositories/encrypted/EncryptedRepository.java +++ b/x-pack/plugin/repository-encrypted/src/main/java/org/elasticsearch/repositories/encrypted/EncryptedRepository.java @@ -10,10 +10,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.Version; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.CheckedSupplier; @@ -40,10 +36,9 @@ import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.license.XPackLicenseState; -import org.elasticsearch.repositories.RepositoryData; +import org.elasticsearch.repositories.FinalizeSnapshotContext; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.RepositoryStats; -import org.elasticsearch.repositories.ShardGenerations; import org.elasticsearch.repositories.SnapshotShardContext; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.snapshots.SnapshotInfo; @@ -202,34 +197,45 @@ public Map adaptUserMetadata(Map userMetadata) { } @Override - public void finalizeSnapshot( - ShardGenerations shardGenerations, - long repositoryStateId, - Metadata clusterMetadata, - SnapshotInfo snapshotInfo, - Version repositoryMetaVersion, - Function stateTransformer, - ActionListener listener - ) { + public void finalizeSnapshot(FinalizeSnapshotContext finalizeSnapshotContext) { + final SnapshotInfo snapshotInfo = finalizeSnapshotContext.snapshotInfo(); try { validateLocalRepositorySecret(snapshotInfo.userMetadata()); } catch (RepositoryException passwordValidationException) { - listener.onFailure(passwordValidationException); + finalizeSnapshotContext.onFailure(passwordValidationException); return; - } finally { - // remove the repository password hash (and salt) from the snapshot metadata so that it is not displayed in the API response - // to the user - snapshotInfo.userMetadata().remove(PASSWORD_HASH_USER_METADATA_KEY); - snapshotInfo.userMetadata().remove(PASSWORD_SALT_USER_METADATA_KEY); } + // remove the repository password hash (and salt) from the snapshot metadata so that it is not displayed in the API response + // to the user + final Map updatedUserMetadata = new HashMap<>(snapshotInfo.userMetadata()); + updatedUserMetadata.remove(PASSWORD_HASH_USER_METADATA_KEY); + updatedUserMetadata.remove(PASSWORD_SALT_USER_METADATA_KEY); + final SnapshotInfo updatedSnapshotInfo = new SnapshotInfo( + snapshotInfo.snapshot(), + snapshotInfo.indices(), + snapshotInfo.dataStreams(), + snapshotInfo.featureStates(), + snapshotInfo.reason(), + snapshotInfo.version(), + snapshotInfo.startTime(), + snapshotInfo.endTime(), + snapshotInfo.totalShards(), + snapshotInfo.successfulShards(), + snapshotInfo.shardFailures(), + snapshotInfo.includeGlobalState(), + updatedUserMetadata, + snapshotInfo.state(), + snapshotInfo.indexSnapshotDetails() + ); super.finalizeSnapshot( - shardGenerations, - repositoryStateId, - clusterMetadata, - snapshotInfo, - repositoryMetaVersion, - stateTransformer, - listener + new FinalizeSnapshotContext( + finalizeSnapshotContext.updatedShardGenerations(), + finalizeSnapshotContext.repositoryStateId(), + finalizeSnapshotContext.clusterMetadata(), + updatedSnapshotInfo, + finalizeSnapshotContext.repositoryMetaVersion(), + finalizeSnapshotContext + ) ); }