diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java index 8c46066da240a..b73297b8f7473 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.cluster.service.ClusterService; @@ -243,7 +244,9 @@ public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenera Function stateTransformer, ActionListener> listener) { if (SnapshotsService.useShardGenerations(repositoryMetaVersion) == false) { - listener = delayedListener(listener); + listener = delayedListener(listener, stateTransformer); + // We're delaying the state update on purpose so we added it to the listener and will just pass a dummy to the repository + stateTransformer = Function.identity(); } super.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures, repositoryStateId, includeGlobalState, clusterMetadata, userMetadata, repositoryMetaVersion, stateTransformer, listener); @@ -251,18 +254,20 @@ public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenera @Override public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Version repositoryMetaVersion, - ActionListener listener) { + Function stateTransformer, ActionListener listener) { if (SnapshotsService.useShardGenerations(repositoryMetaVersion) == false) { - listener = delayedListener(listener); + listener = delayedListener(listener, stateTransformer); + // We're delaying the state update on purpose so we added it to the listener and will just pass a dummy to the repository + stateTransformer = Function.identity(); } - super.deleteSnapshot(snapshotId, repositoryStateId, repositoryMetaVersion, listener); + super.deleteSnapshot(snapshotId, repositoryStateId, repositoryMetaVersion, stateTransformer, listener); } /** * Wraps given listener such that it is executed with a delay of {@link #coolDown} on the snapshot thread-pool after being invoked. * See {@link #COOLDOWN_PERIOD} for details. */ - private ActionListener delayedListener(ActionListener listener) { + private ActionListener delayedListener(ActionListener listener, Function stateTransformer) { final ActionListener wrappedListener = ActionListener.runBefore(listener, () -> { final Scheduler.Cancellable cancellable = finalizationFuture.getAndSet(null); assert cancellable != null; @@ -272,8 +277,23 @@ private ActionListener delayedListener(ActionListener listener) { public void onResponse(T response) { logCooldownInfo(); final Scheduler.Cancellable existing = finalizationFuture.getAndSet( - threadPool.schedule(ActionRunnable.wrap(wrappedListener, l -> l.onResponse(response)), - coolDown, ThreadPool.Names.SNAPSHOT)); + threadPool.schedule(ActionRunnable.wrap(wrappedListener, l -> clusterService.submitStateUpdateTask( + "Delayed s3 repository finalization", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + return stateTransformer.apply(currentState); + } + + @Override + public void onFailure(String source, Exception e) { + l.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + l.onResponse(response); + } + })), coolDown, ThreadPool.Names.SNAPSHOT)); assert existing == null : "Already have an ongoing finalization " + finalizationFuture; } diff --git a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java index 8697b43b51573..a4fd3d1900396 100644 --- a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java @@ -88,8 +88,8 @@ public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenera @Override public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Version repositoryMetaVersion, - ActionListener listener) { - in.deleteSnapshot(snapshotId, repositoryStateId, repositoryMetaVersion, listener); + Function stateTransformer, ActionListener listener) { + in.deleteSnapshot(snapshotId, repositoryStateId, repositoryMetaVersion, stateTransformer, listener); } @Override diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java index 686f0cf11b818..8cdce2e8e2760 100644 --- a/server/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java @@ -145,9 +145,12 @@ void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, * @param snapshotId snapshot id * @param repositoryStateId the unique id identifying the state of the repository when the snapshot deletion began * @param repositoryMetaVersion version of the updated repository metadata to write + * @param stateTransformer a function that filters the last cluster state update that the snapshot delete will execute and + * is used to remove any state tracked for the in-progress snapshot from the cluster state * @param listener completion listener */ - void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Version repositoryMetaVersion, ActionListener listener); + void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Version repositoryMetaVersion, + Function stateTransformer, ActionListener listener); /** * Returns snapshot throttle time in nanoseconds diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index ec375d2d22925..48ea5db8c00bc 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -242,7 +242,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp private final BlobPath basePath; - private final ClusterService clusterService; + protected final ClusterService clusterService; /** * Flag that is set to {@code true} if this instance is started with {@link #metadata} that has a higher value for @@ -495,7 +495,7 @@ public RepositoryStats stats() { @Override public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Version repositoryMetaVersion, - ActionListener listener) { + Function stateTransformer, ActionListener listener) { if (isReadOnly()) { listener.onFailure(new RepositoryException(metadata.name(), "cannot delete snapshot from a readonly repository")); } else { @@ -513,7 +513,7 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Versio // delete an index that was created by another master node after writing this index-N blob. final Map foundIndices = blobStore().blobContainer(indicesPath()).children(); doDeleteShardSnapshots(snapshotId, repositoryStateId, foundIndices, rootBlobs, repositoryData, - SnapshotsService.useShardGenerations(repositoryMetaVersion), listener); + SnapshotsService.useShardGenerations(repositoryMetaVersion), stateTransformer, listener); } catch (Exception ex) { listener.onFailure(new RepositoryException(metadata.name(), "failed to delete snapshot [" + snapshotId + "]", ex)); } @@ -566,11 +566,12 @@ private RepositoryData safeRepositoryData(long repositoryStateId, Map foundIndices, Map rootBlobs, RepositoryData repositoryData, boolean writeShardGens, - ActionListener listener) { + Function stateTransformer, ActionListener listener) { if (writeShardGens) { // First write the new shard state metadata (with the removed snapshot) and compute deletion targets @@ -590,7 +591,7 @@ private void doDeleteShardSnapshots(SnapshotId snapshotId, long repositoryStateI builder.put(newGen.indexId, newGen.shardId, newGen.newGeneration); } final RepositoryData updatedRepoData = repositoryData.removeSnapshot(snapshotId, builder.build()); - writeIndexGen(updatedRepoData, repositoryStateId, true, Function.identity(), + writeIndexGen(updatedRepoData, repositoryStateId, true, stateTransformer, ActionListener.wrap(v -> writeUpdatedRepoDataStep.onResponse(updatedRepoData), listener::onFailure)); }, listener::onFailure); // Once we have updated the repository, run the clean-ups @@ -605,9 +606,26 @@ private void doDeleteShardSnapshots(SnapshotId snapshotId, long repositoryStateI // Write the new repository data first (with the removed snapshot), using no shard generations final RepositoryData updatedRepoData = repositoryData.removeSnapshot(snapshotId, ShardGenerations.EMPTY); writeIndexGen(updatedRepoData, repositoryStateId, false, Function.identity(), ActionListener.wrap(v -> { - // Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion - final ActionListener afterCleanupsListener = - new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2); + // Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion and then apply the state transformer to the + // cluster state. When not using shard-generations, we must do this after doing all the shard folder updates because + // the repository is not safe for concurrent writes until all shard folders have been updated + final ActionListener afterCleanupsListener = new GroupedActionListener<>(ActionListener.wrap(() -> + clusterService.submitStateUpdateTask("remove snapshot deletion metadata", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + return stateTransformer.apply(currentState); + } + + @Override + public void onFailure(String source, Exception e) { + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + listener.onResponse(null); + } + })), 2); asyncCleanupUnlinkedRootAndIndicesBlobs(foundIndices, rootBlobs, updatedRepoData, afterCleanupsListener); final StepListener> writeMetaAndComputeDeletesStep = new StepListener<>(); writeUpdatedShardMetadataAndComputeDeletes(snapshotId, repositoryData, false, writeMetaAndComputeDeletesStep); diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 220870fa8d31a..582d41de01420 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -1263,9 +1263,12 @@ private void deleteSnapshotFromRepository(Snapshot snapshot, @Nullable ActionLis repository.getRepositoryData(ActionListener.wrap(repositoryData -> repository.deleteSnapshot(snapshot.getSnapshotId(), repositoryStateId, minCompatibleVersion(minNodeVersion, snapshot.getRepository(), repositoryData, snapshot.getSnapshotId()), + SnapshotsService::stateWithoutDelete, ActionListener.wrap(v -> { - logger.info("snapshot [{}] deleted", snapshot); - removeSnapshotDeletionFromClusterState(snapshot, null, l); + logger.info("Successfully deleted snapshot [{}]", snapshot); + if (l != null) { + l.onResponse(null); + } }, ex -> removeSnapshotDeletionFromClusterState(snapshot, ex, l) )), ex -> removeSnapshotDeletionFromClusterState(snapshot, ex, l))); })); @@ -1274,25 +1277,13 @@ private void deleteSnapshotFromRepository(Snapshot snapshot, @Nullable ActionLis /** * Removes the snapshot deletion from {@link SnapshotDeletionsInProgress} in the cluster state. */ - private void removeSnapshotDeletionFromClusterState(final Snapshot snapshot, @Nullable final Exception failure, + private void removeSnapshotDeletionFromClusterState(final Snapshot snapshot, final Exception failure, @Nullable final ActionListener listener) { + assert failure != null; clusterService.submitStateUpdateTask("remove snapshot deletion metadata", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { - SnapshotDeletionsInProgress deletions = currentState.custom(SnapshotDeletionsInProgress.TYPE); - if (deletions != null) { - boolean changed = false; - if (deletions.hasDeletionsInProgress()) { - assert deletions.getEntries().size() == 1 : "should have exactly one deletion in progress"; - SnapshotDeletionsInProgress.Entry entry = deletions.getEntries().get(0); - deletions = deletions.withRemovedEntry(entry); - changed = true; - } - if (changed) { - return ClusterState.builder(currentState).putCustom(SnapshotDeletionsInProgress.TYPE, deletions).build(); - } - } - return currentState; + return stateWithoutDelete(currentState); } @Override @@ -1306,17 +1297,29 @@ public void onFailure(String source, Exception e) { @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { if (listener != null) { - if (failure != null) { - listener.onFailure(failure); - } else { - logger.info("Successfully deleted snapshot [{}]", snapshot); - listener.onResponse(null); - } + listener.onFailure(failure); } } }); } + private static ClusterState stateWithoutDelete(ClusterState currentState) { + SnapshotDeletionsInProgress deletions = currentState.custom(SnapshotDeletionsInProgress.TYPE); + if (deletions != null) { + boolean changed = false; + if (deletions.hasDeletionsInProgress()) { + assert deletions.getEntries().size() == 1 : "should have exactly one deletion in progress"; + SnapshotDeletionsInProgress.Entry entry = deletions.getEntries().get(0); + deletions = deletions.withRemovedEntry(entry); + changed = true; + } + if (changed) { + return ClusterState.builder(currentState).putCustom(SnapshotDeletionsInProgress.TYPE, deletions).build(); + } + } + return currentState; + } + /** * Calculates the list of shards that should be included into the current snapshot * diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java index 1e190cca02a6a..0e75a129baa05 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java @@ -168,7 +168,7 @@ public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations indices, lo @Override public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Version repositoryMetaVersion, - ActionListener listener) { + Function stateTransformer, ActionListener listener) { listener.onResponse(null); } diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java index 588fd4c28e8b4..f59a227183dff 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java @@ -106,7 +106,7 @@ public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenera @Override public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Version repositoryMetaVersion, - ActionListener listener) { + Function stateTransformer, ActionListener listener) { listener.onResponse(null); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index c741595634dc2..fcbfd10c3227a 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -262,7 +262,7 @@ public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenera @Override public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Version repositoryMetaVersion, - ActionListener listener) { + Function stateTransformer, ActionListener listener) { throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); }