From eadfeac0ec6cb0be5cb24b1d41c1013bcb77417a Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 17 Aug 2021 09:05:49 +0200 Subject: [PATCH] Refactor Snapshot Finalization Method (#76005) (#76594) This refactors the signature of snapshot finalization. For one it allows removing a TODO about being dependent on mutable `SnapshotInfo` which was not great but more importantly this sets up a follow-up where state can be shared between the cluster state update at the end of finalization and subsequent old-shard-generation cleanup so that we can resolve another open TODO about leaking shard generation files in some cases. --- .../repositories/s3/S3Repository.java | 24 ++--- .../RepositoryFilterUserMetadataIT.java | 28 +----- .../repositories/FilterRepository.java | 21 +---- .../repositories/FinalizeSnapshotContext.java | 89 +++++++++++++++++++ .../repositories/Repository.java | 19 +--- .../blobstore/BlobStoreRepository.java | 36 +++----- .../elasticsearch/snapshots/SnapshotInfo.java | 5 +- .../snapshots/SnapshotsService.java | 31 +++---- .../RepositoriesServiceTests.java | 14 +-- .../BlobStoreRepositoryRestoreTests.java | 46 +++++----- .../index/shard/RestoreOnlyRepository.java | 10 +-- .../AbstractSnapshotIntegTestCase.java | 6 +- .../xpack/ccr/repository/CcrRepository.java | 6 +- .../SourceOnlySnapshotRepository.java | 31 ++++--- .../SourceOnlySnapshotShardTests.java | 12 +-- .../encrypted/EncryptedRepository.java | 62 +++++++------ 16 files changed, 233 insertions(+), 207 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/repositories/FinalizeSnapshotContext.java diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java index d8c4e49de1ea3..b472a24d3fdad 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java @@ -13,8 +13,6 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; @@ -32,12 +30,11 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.monitor.jvm.JvmInfo; +import org.elasticsearch.repositories.FinalizeSnapshotContext; import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.RepositoryException; -import org.elasticsearch.repositories.ShardGenerations; import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository; import org.elasticsearch.snapshots.SnapshotId; -import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotsService; import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; @@ -276,15 +273,18 @@ private static Map buildLocation(RepositoryMetadata metadata) { private final AtomicReference finalizationFuture = new AtomicReference<>(); @Override - public void finalizeSnapshot(ShardGenerations shardGenerations, long repositoryStateId, Metadata clusterMetadata, - SnapshotInfo snapshotInfo, Version repositoryMetaVersion, - Function stateTransformer, - ActionListener listener) { - if (SnapshotsService.useShardGenerations(repositoryMetaVersion) == false) { - listener = delayedListener(listener); + public void finalizeSnapshot(FinalizeSnapshotContext finalizeSnapshotContext) { + if (SnapshotsService.useShardGenerations(finalizeSnapshotContext.repositoryMetaVersion()) == false) { + finalizeSnapshotContext = new FinalizeSnapshotContext( + finalizeSnapshotContext.updatedShardGenerations(), + finalizeSnapshotContext.repositoryStateId(), + finalizeSnapshotContext.clusterMetadata(), + finalizeSnapshotContext.snapshotInfo(), + finalizeSnapshotContext.repositoryMetaVersion(), + delayedListener(finalizeSnapshotContext) + ); } - super.finalizeSnapshot(shardGenerations, repositoryStateId, clusterMetadata, snapshotInfo, repositoryMetaVersion, - stateTransformer, listener); + super.finalizeSnapshot(finalizeSnapshotContext); } @Override diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoryFilterUserMetadataIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoryFilterUserMetadataIT.java index f15f6a1149f98..6f7fcd6abf6f0 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoryFilterUserMetadataIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoryFilterUserMetadataIT.java @@ -7,10 +7,6 @@ */ package org.elasticsearch.snapshots; -import org.elasticsearch.Version; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; @@ -19,9 +15,8 @@ import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.RepositoryPlugin; +import org.elasticsearch.repositories.FinalizeSnapshotContext; import org.elasticsearch.repositories.Repository; -import org.elasticsearch.repositories.RepositoryData; -import org.elasticsearch.repositories.ShardGenerations; import org.elasticsearch.repositories.SnapshotShardContext; import org.elasticsearch.repositories.fs.FsRepository; import org.elasticsearch.test.ESIntegTestCase; @@ -29,7 +24,6 @@ import java.util.Collection; import java.util.Collections; import java.util.Map; -import java.util.function.Function; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.is; @@ -89,24 +83,8 @@ public Map getRepositories( private final String initialMetaValue = metadata.settings().get(MASTER_SETTING_VALUE); @Override - public void finalizeSnapshot( - ShardGenerations shardGenerations, - long repositoryStateId, - Metadata clusterMetadata, - SnapshotInfo snapshotInfo, - Version repositoryMetaVersion, - Function stateTransformer, - ActionListener listener - ) { - super.finalizeSnapshot( - shardGenerations, - repositoryStateId, - clusterMetadata, - snapshotInfo, - repositoryMetaVersion, - stateTransformer, - listener - ); + public void finalizeSnapshot(FinalizeSnapshotContext finalizeSnapshotContext) { + super.finalizeSnapshot(finalizeSnapshotContext); } @Override diff --git a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java index 18f807aac0552..1c8249e82f25c 100644 --- a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java @@ -23,7 +23,6 @@ import org.elasticsearch.index.store.Store; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.snapshots.SnapshotId; -import org.elasticsearch.snapshots.SnapshotInfo; import java.io.IOException; import java.util.Collection; @@ -75,24 +74,8 @@ public void initializeSnapshot(SnapshotId snapshotId, List indices, Met } @Override - public void finalizeSnapshot( - ShardGenerations shardGenerations, - long repositoryStateId, - Metadata clusterMetadata, - SnapshotInfo snapshotInfo, - Version repositoryMetaVersion, - Function stateTransformer, - ActionListener listener - ) { - in.finalizeSnapshot( - shardGenerations, - repositoryStateId, - clusterMetadata, - snapshotInfo, - repositoryMetaVersion, - stateTransformer, - listener - ); + public void finalizeSnapshot(final FinalizeSnapshotContext finalizeSnapshotContext) { + in.finalizeSnapshot(finalizeSnapshotContext); } @Override diff --git a/server/src/main/java/org/elasticsearch/repositories/FinalizeSnapshotContext.java b/server/src/main/java/org/elasticsearch/repositories/FinalizeSnapshotContext.java new file mode 100644 index 0000000000000..7e1ad3f3f83c1 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/repositories/FinalizeSnapshotContext.java @@ -0,0 +1,89 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.repositories; + +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.snapshots.SnapshotInfo; +import org.elasticsearch.snapshots.SnapshotsService; + +/** + * Context for finalizing a snapshot. + */ +public final class FinalizeSnapshotContext extends ActionListener.Delegating< + Tuple, + Tuple> { + + private final ShardGenerations updatedShardGenerations; + + private final long repositoryStateId; + + private final Metadata clusterMetadata; + + private final SnapshotInfo snapshotInfo; + + private final Version repositoryMetaVersion; + + /** + * @param updatedShardGenerations updated shard generations + * @param repositoryStateId the unique id identifying the state of the repository when the snapshot began + * @param clusterMetadata cluster metadata + * @param snapshotInfo SnapshotInfo instance to write for this snapshot + * @param repositoryMetaVersion version of the updated repository metadata to write + * @param listener listener to be invoked with the new {@link RepositoryData} and {@link SnapshotInfo} after completing + * the snapshot + */ + public FinalizeSnapshotContext( + ShardGenerations updatedShardGenerations, + long repositoryStateId, + Metadata clusterMetadata, + SnapshotInfo snapshotInfo, + Version repositoryMetaVersion, + ActionListener> listener + ) { + super(listener); + this.updatedShardGenerations = updatedShardGenerations; + this.repositoryStateId = repositoryStateId; + this.clusterMetadata = clusterMetadata; + this.snapshotInfo = snapshotInfo; + this.repositoryMetaVersion = repositoryMetaVersion; + } + + public long repositoryStateId() { + return repositoryStateId; + } + + public ShardGenerations updatedShardGenerations() { + return updatedShardGenerations; + } + + public SnapshotInfo snapshotInfo() { + return snapshotInfo; + } + + public Version repositoryMetaVersion() { + return repositoryMetaVersion; + } + + public Metadata clusterMetadata() { + return clusterMetadata; + } + + public ClusterState updatedClusterState(ClusterState state) { + return SnapshotsService.stateWithoutSuccessfulSnapshot(state, snapshotInfo.snapshot()); + } + + @Override + public void onResponse(Tuple repositoryData) { + delegate.onResponse(repositoryData); + } +} diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java index a030a98602813..bbd051807cca9 100644 --- a/server/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java @@ -147,24 +147,9 @@ public void onFailure(Exception e) { *

* This method is called on master after all shards are snapshotted. * - * @param shardGenerations updated shard generations - * @param repositoryStateId the unique id identifying the state of the repository when the snapshot began - * @param clusterMetadata cluster metadata - * @param snapshotInfo SnapshotInfo instance to write for this snapshot - * @param repositoryMetaVersion version of the updated repository metadata to write - * @param stateTransformer a function that filters the last cluster state update that the snapshot finalization will execute and - * is used to remove any state tracked for the in-progress snapshot from the cluster state - * @param listener listener to be invoked with the new {@link RepositoryData} after completing the snapshot + * @param finalizeSnapshotContext finalization context */ - void finalizeSnapshot( - ShardGenerations shardGenerations, - long repositoryStateId, - Metadata clusterMetadata, - SnapshotInfo snapshotInfo, - Version repositoryMetaVersion, - Function stateTransformer, - ActionListener listener - ); + void finalizeSnapshot(FinalizeSnapshotContext finalizeSnapshotContext); /** * Deletes snapshots diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 53b034e1bd5f8..67c68284483ed 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -91,6 +91,7 @@ import org.elasticsearch.index.store.StoreFileMetadata; import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.repositories.FinalizeSnapshotContext; import org.elasticsearch.repositories.GetSnapshotInfoContext; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.IndexMetaDataGenerations; @@ -1350,15 +1351,10 @@ private DeleteResult cleanupStaleIndices(Map foundIndices } @Override - public void finalizeSnapshot( - final ShardGenerations shardGenerations, - final long repositoryStateId, - final Metadata clusterMetadata, - SnapshotInfo snapshotInfo, - Version repositoryMetaVersion, - Function stateTransformer, - final ActionListener listener - ) { + public void finalizeSnapshot(final FinalizeSnapshotContext finalizeSnapshotContext) { + final long repositoryStateId = finalizeSnapshotContext.repositoryStateId(); + final ShardGenerations shardGenerations = finalizeSnapshotContext.updatedShardGenerations(); + final SnapshotInfo snapshotInfo = finalizeSnapshotContext.snapshotInfo(); assert repositoryStateId > RepositoryData.UNKNOWN_REPO_GEN : "Must finalize based on a valid repository generation but received [" + repositoryStateId + "]"; final Collection indices = shardGenerations.indices(); @@ -1367,8 +1363,9 @@ public void finalizeSnapshot( // directory if all nodes are at least at version SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION // If there are older version nodes in the cluster, we don't need to run this cleanup as it will have already happened // when writing the index-${N} to each shard directory. + final Version repositoryMetaVersion = finalizeSnapshotContext.repositoryMetaVersion(); final boolean writeShardGens = SnapshotsService.useShardGenerations(repositoryMetaVersion); - final Consumer onUpdateFailure = e -> listener.onFailure( + final Consumer onUpdateFailure = e -> finalizeSnapshotContext.onFailure( new SnapshotException(metadata.name(), snapshotId, "failed to update snapshot in repository", e) ); @@ -1381,7 +1378,7 @@ public void finalizeSnapshot( repoDataListener.whenComplete(existingRepositoryData -> { final int existingSnapshotCount = existingRepositoryData.getSnapshotIds().size(); if (existingSnapshotCount >= maxSnapshotCount) { - listener.onFailure( + finalizeSnapshotContext.onFailure( new RepositoryException( metadata.name(), "Cannot add another snapshot to this repository as it " @@ -1412,23 +1409,16 @@ public void finalizeSnapshot( snapshotInfo.startTime(), snapshotInfo.endTime() ); - final RepositoryData updatedRepositoryData = existingRepositoryData.addSnapshot( - snapshotId, - snapshotDetails, - shardGenerations, - indexMetas, - indexMetaIdentifiers - ); writeIndexGen( - updatedRepositoryData, + existingRepositoryData.addSnapshot(snapshotId, snapshotDetails, shardGenerations, indexMetas, indexMetaIdentifiers), repositoryStateId, repositoryMetaVersion, - stateTransformer, + finalizeSnapshotContext::updatedClusterState, ActionListener.wrap(newRepoData -> { if (writeShardGens) { - cleanupOldShardGens(existingRepositoryData, updatedRepositoryData); + cleanupOldShardGens(existingRepositoryData, newRepoData); } - listener.onResponse(newRepoData); + finalizeSnapshotContext.onResponse(Tuple.tuple(newRepoData, snapshotInfo)); }, onUpdateFailure) ); }, onUpdateFailure), 2 + indices.size()); @@ -1438,7 +1428,7 @@ public void finalizeSnapshot( // index or global metadata will be compatible with the segments written in this snapshot as well. // Failing on an already existing index-${repoGeneration} below ensures that the index.latest blob is not updated in a way // that decrements the generation it points at - + final Metadata clusterMetadata = finalizeSnapshotContext.clusterMetadata(); // Write Global MetaData executor.execute( ActionRunnable.run( diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotInfo.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotInfo.java index 4add6d73fdb68..d104d2136164a 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotInfo.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotInfo.java @@ -427,7 +427,7 @@ public SnapshotInfo( ); } - SnapshotInfo( + public SnapshotInfo( Snapshot snapshot, List indices, List dataStreams, @@ -457,7 +457,8 @@ public SnapshotInfo( this.successfulShards = successfulShards; this.shardFailures = org.elasticsearch.core.List.copyOf(shardFailures); this.includeGlobalState = includeGlobalState; - this.userMetadata = userMetadata; + this.userMetadata = userMetadata == null ? null : org.elasticsearch.core.Map.copyOf(userMetadata); + ; this.indexSnapshotDetails = org.elasticsearch.core.Map.copyOf(indexSnapshotDetails); } diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 1798e4bb1bb11..cfcd567145040 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -73,6 +73,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.AssociatedIndexDescriptor; import org.elasticsearch.indices.SystemIndices; +import org.elasticsearch.repositories.FinalizeSnapshotContext; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; @@ -2011,24 +2012,24 @@ private void finalizeSnapshotEntry(Snapshot snapshot, Metadata metadata, Reposit entry.partial() ? shardGenerations.totalShards() : entry.shardsByRepoShardId().size(), shardFailures, entry.includeGlobalState(), - // TODO: remove this hack making the metadata mutable once - // https://github.com/elastic/elasticsearch/pull/72776 has been merged - entry.userMetadata() == null ? null : new HashMap<>(entry.userMetadata()), + entry.userMetadata(), entry.startTime(), indexSnapshotDetails ); repo.finalizeSnapshot( - shardGenerations, - repositoryData.getGenId(), - metaForSnapshot, - snapshotInfo, - entry.version(), - state -> stateWithoutSuccessfulSnapshot(state, snapshot), - ActionListener.wrap(newRepoData -> { - completeListenersIgnoringException(endAndGetListenersToResolve(snapshot), Tuple.tuple(newRepoData, snapshotInfo)); - logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state()); - runNextQueuedOperation(newRepoData, repository, true); - }, e -> handleFinalizationFailure(e, snapshot, repositoryData)) + new FinalizeSnapshotContext( + shardGenerations, + repositoryData.getGenId(), + metaForSnapshot, + snapshotInfo, + entry.version(), + ActionListener.wrap(result -> { + final SnapshotInfo writtenSnapshotInfo = result.v2(); + completeListenersIgnoringException(endAndGetListenersToResolve(writtenSnapshotInfo.snapshot()), result); + logger.info("snapshot [{}] completed with state [{}]", snapshot, writtenSnapshotInfo.state()); + runNextQueuedOperation(result.v1(), repository, true); + }, e -> handleFinalizationFailure(e, snapshot, repositoryData)) + ) ); }, e -> handleFinalizationFailure(e, snapshot, repositoryData)); } catch (Exception e) { @@ -2226,7 +2227,7 @@ private static Tuple> read * @param snapshot snapshot for which to remove the snapshot operation * @return updated cluster state */ - private static ClusterState stateWithoutSuccessfulSnapshot(ClusterState state, Snapshot snapshot) { + public static ClusterState stateWithoutSuccessfulSnapshot(ClusterState state, Snapshot snapshot) { // TODO: updating snapshots here leaks their outdated generation files, we should add logic to clean those up and enhance // BlobStoreTestUtil to catch this leak SnapshotsInProgress snapshots = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY); diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java index 4e1bbf2dfd028..9b374449f6dcf 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java @@ -38,7 +38,6 @@ import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository; import org.elasticsearch.snapshots.SnapshotId; -import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transport; @@ -226,17 +225,8 @@ public void initializeSnapshot(SnapshotId snapshotId, List indices, Met } - @Override - public void finalizeSnapshot( - ShardGenerations shardGenerations, - long repositoryStateId, - Metadata clusterMetadata, - SnapshotInfo snapshotInfo, - Version repositoryMetaVersion, - Function stateTransformer, - ActionListener listener - ) { - listener.onResponse(null); + public void finalizeSnapshot(FinalizeSnapshotContext finalizeSnapshotContext) { + finalizeSnapshotContext.onResponse(null); } @Override diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java index 4341af98b2d9b..0b60ed8c2956c 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.core.Tuple; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.Environment; import org.elasticsearch.env.TestEnvironment; @@ -35,6 +36,7 @@ import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetadata; import org.elasticsearch.indices.recovery.RecoverySettings; +import org.elasticsearch.repositories.FinalizeSnapshotContext; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; @@ -50,7 +52,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.function.Function; import java.util.stream.Collectors; import static org.hamcrest.Matchers.containsString; @@ -171,28 +172,29 @@ public void testSnapshotWithConflictingName() throws Exception { new SnapshotId(snapshot.getSnapshotId().getName(), "_uuid2") ); final ShardGenerations shardGenerations = ShardGenerations.builder().put(indexId, 0, shardGen).build(); - PlainActionFuture.get( + PlainActionFuture., Exception>get( f -> repository.finalizeSnapshot( - shardGenerations, - RepositoryData.EMPTY_REPO_GEN, - Metadata.builder().put(shard.indexSettings().getIndexMetadata(), false).build(), - new SnapshotInfo( - snapshot, - shardGenerations.indices().stream().map(IndexId::getName).collect(Collectors.toList()), - Collections.emptyList(), - Collections.emptyList(), - null, - 1L, - 6, - Collections.emptyList(), - true, - Collections.emptyMap(), - 0L, - Collections.emptyMap() - ), - Version.CURRENT, - Function.identity(), - f + new FinalizeSnapshotContext( + shardGenerations, + RepositoryData.EMPTY_REPO_GEN, + Metadata.builder().put(shard.indexSettings().getIndexMetadata(), false).build(), + new SnapshotInfo( + snapshot, + shardGenerations.indices().stream().map(IndexId::getName).collect(Collectors.toList()), + Collections.emptyList(), + Collections.emptyList(), + null, + 1L, + 6, + Collections.emptyList(), + true, + Collections.emptyMap(), + 0L, + Collections.emptyMap() + ), + Version.CURRENT, + f + ) ) ); IndexShardSnapshotFailedException isfe = expectThrows( diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java index 44ffde8a8ddbc..eccb721dd8611 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java @@ -17,6 +17,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; +import org.elasticsearch.repositories.FinalizeSnapshotContext; import org.elasticsearch.repositories.GetSnapshotInfoContext; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.IndexMetaDataGenerations; @@ -27,7 +28,6 @@ import org.elasticsearch.repositories.ShardSnapshotResult; import org.elasticsearch.repositories.SnapshotShardContext; import org.elasticsearch.snapshots.SnapshotId; -import org.elasticsearch.snapshots.SnapshotInfo; import java.util.Collection; import java.util.Collections; @@ -97,12 +97,8 @@ public void getRepositoryData(ActionListener listener) { public void initializeSnapshot(SnapshotId snapshotId, List indices, Metadata metadata) { } - @Override - public void finalizeSnapshot(ShardGenerations shardGenerations, long repositoryStateId, - Metadata clusterMetadata, SnapshotInfo snapshotInfo, Version repositoryMetaVersion, - Function stateTransformer, - ActionListener listener) { - listener.onResponse(null); + public void finalizeSnapshot(FinalizeSnapshotContext finalizeSnapshotContext) { + finalizeSnapshotContext.onResponse(null); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java index c0ae87a698c36..8cf729588d0f1 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -43,8 +43,10 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.core.Tuple; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.repositories.FinalizeSnapshotContext; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; @@ -484,9 +486,9 @@ protected void addBwCFailedSnapshot(String repoName, String snapshotName, Mapget(f -> repo.finalizeSnapshot( + PlainActionFuture., Exception>get(f -> repo.finalizeSnapshot(new FinalizeSnapshotContext( ShardGenerations.EMPTY, getRepositoryData(repoName).getGenId(), state.metadata(), snapshotInfo, - SnapshotsService.OLD_SNAPSHOT_FORMAT, Function.identity(), f)); + SnapshotsService.OLD_SNAPSHOT_FORMAT, f))); } protected void awaitNDeletionsInProgress(int count) throws Exception { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 683a2c40a17f2..6d29e3f4cc0b9 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -63,6 +63,7 @@ import org.elasticsearch.indices.recovery.MultiChunkTransfer; import org.elasticsearch.indices.recovery.MultiFileWriter; import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.repositories.FinalizeSnapshotContext; import org.elasticsearch.repositories.GetSnapshotInfoContext; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.IndexMetaDataGenerations; @@ -297,10 +298,7 @@ public void initializeSnapshot(SnapshotId snapshotId, List indices, Met } @Override - public void finalizeSnapshot(ShardGenerations shardGenerations, long repositoryStateId, Metadata metadata, - SnapshotInfo snapshotInfo, Version repositoryMetaVersion, - Function stateTransformer, - ActionListener listener) { + public void finalizeSnapshot(FinalizeSnapshotContext finalizeSnapshotContext) { throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotRepository.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotRepository.java index 6acf693170028..bd51c903f593e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotRepository.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotRepository.java @@ -19,9 +19,6 @@ import org.apache.lucene.store.FSDirectory; import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.store.SimpleFSDirectory; -import org.elasticsearch.Version; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.MappingMetadata; import org.elasticsearch.cluster.metadata.Metadata; @@ -42,13 +39,11 @@ import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.TranslogStats; import org.elasticsearch.repositories.FilterRepository; +import org.elasticsearch.repositories.FinalizeSnapshotContext; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.Repository; -import org.elasticsearch.repositories.RepositoryData; -import org.elasticsearch.repositories.ShardGenerations; import org.elasticsearch.repositories.SnapshotShardContext; import org.elasticsearch.snapshots.SnapshotId; -import org.elasticsearch.snapshots.SnapshotInfo; import java.io.Closeable; import java.io.IOException; @@ -106,18 +101,26 @@ public void initializeSnapshot(SnapshotId snapshotId, List indices, Met } @Override - public void finalizeSnapshot(ShardGenerations shardGenerations, long repositoryStateId, Metadata metadata, - SnapshotInfo snapshotInfo, Version repositoryMetaVersion, - Function stateTransformer, - ActionListener listener) { + public void finalizeSnapshot(FinalizeSnapshotContext finalizeSnapshotContext) { // we process the index metadata at snapshot time. This means if somebody tries to restore // a _source only snapshot with a plain repository it will be just fine since we already set the // required engine, that the index is read-only and the mapping to a default mapping try { - super.finalizeSnapshot(shardGenerations, repositoryStateId, metadataToSnapshot(shardGenerations.indices(), metadata), - snapshotInfo, repositoryMetaVersion, stateTransformer, listener); - } catch (IOException ex) { - listener.onFailure(ex); + super.finalizeSnapshot( + new FinalizeSnapshotContext( + finalizeSnapshotContext.updatedShardGenerations(), + finalizeSnapshotContext.repositoryStateId(), + metadataToSnapshot( + finalizeSnapshotContext.updatedShardGenerations().indices(), + finalizeSnapshotContext.clusterMetadata() + ), + finalizeSnapshotContext.snapshotInfo(), + finalizeSnapshotContext.repositoryMetaVersion(), + finalizeSnapshotContext + ) + ); + } catch (IOException e) { + finalizeSnapshotContext.onFailure(e); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotShardTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotShardTests.java index 94a2b4b75ac5f..23f19701bcb81 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotShardTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotShardTests.java @@ -41,6 +41,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.core.Tuple; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.Environment; import org.elasticsearch.env.TestEnvironment; @@ -63,12 +64,13 @@ import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.repositories.FinalizeSnapshotContext; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.ShardGenerations; -import org.elasticsearch.repositories.SnapshotShardContext; import org.elasticsearch.repositories.ShardSnapshotResult; +import org.elasticsearch.repositories.SnapshotShardContext; import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; import org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase; import org.elasticsearch.repositories.fs.FsRepository; @@ -83,7 +85,6 @@ import java.util.Collections; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; -import java.util.function.Function; import java.util.stream.Collectors; public class SourceOnlySnapshotShardTests extends IndexShardTestCase { @@ -227,10 +228,10 @@ public void testRestoreMinmal() throws IOException { repository.snapshotShard(new SnapshotShardContext(shard.store(), shard.mapperService(), snapshotId, indexId, snapshotRef, null, indexShardSnapshotStatus, Version.CURRENT, Collections.emptyMap(), future)); future.actionGet(); - final PlainActionFuture finFuture = PlainActionFuture.newFuture(); + final PlainActionFuture> finFuture = PlainActionFuture.newFuture(); final ShardGenerations shardGenerations = ShardGenerations.builder().put(indexId, 0, indexShardSnapshotStatus.generation()).build(); - repository.finalizeSnapshot( + repository.finalizeSnapshot(new FinalizeSnapshotContext( shardGenerations, ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository).getGenId(), Metadata.builder().put(shard.indexSettings().getIndexMetadata(), false).build(), @@ -248,7 +249,8 @@ public void testRestoreMinmal() throws IOException { Collections.emptyMap(), 0L, Collections.emptyMap()), - Version.CURRENT, Function.identity(), finFuture); + Version.CURRENT, finFuture + )); finFuture.actionGet(); }); IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy(); diff --git a/x-pack/plugin/repository-encrypted/src/main/java/org/elasticsearch/repositories/encrypted/EncryptedRepository.java b/x-pack/plugin/repository-encrypted/src/main/java/org/elasticsearch/repositories/encrypted/EncryptedRepository.java index eb1411bc1a2ed..49fb12059ec3a 100644 --- a/x-pack/plugin/repository-encrypted/src/main/java/org/elasticsearch/repositories/encrypted/EncryptedRepository.java +++ b/x-pack/plugin/repository-encrypted/src/main/java/org/elasticsearch/repositories/encrypted/EncryptedRepository.java @@ -10,10 +10,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.Version; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.CheckedSupplier; @@ -40,10 +36,9 @@ import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.license.XPackLicenseState; -import org.elasticsearch.repositories.RepositoryData; +import org.elasticsearch.repositories.FinalizeSnapshotContext; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.RepositoryStats; -import org.elasticsearch.repositories.ShardGenerations; import org.elasticsearch.repositories.SnapshotShardContext; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.snapshots.SnapshotInfo; @@ -202,34 +197,45 @@ public Map adaptUserMetadata(Map userMetadata) { } @Override - public void finalizeSnapshot( - ShardGenerations shardGenerations, - long repositoryStateId, - Metadata clusterMetadata, - SnapshotInfo snapshotInfo, - Version repositoryMetaVersion, - Function stateTransformer, - ActionListener listener - ) { + public void finalizeSnapshot(FinalizeSnapshotContext finalizeSnapshotContext) { + final SnapshotInfo snapshotInfo = finalizeSnapshotContext.snapshotInfo(); try { validateLocalRepositorySecret(snapshotInfo.userMetadata()); } catch (RepositoryException passwordValidationException) { - listener.onFailure(passwordValidationException); + finalizeSnapshotContext.onFailure(passwordValidationException); return; - } finally { - // remove the repository password hash (and salt) from the snapshot metadata so that it is not displayed in the API response - // to the user - snapshotInfo.userMetadata().remove(PASSWORD_HASH_USER_METADATA_KEY); - snapshotInfo.userMetadata().remove(PASSWORD_SALT_USER_METADATA_KEY); } + // remove the repository password hash (and salt) from the snapshot metadata so that it is not displayed in the API response + // to the user + final Map updatedUserMetadata = new HashMap<>(snapshotInfo.userMetadata()); + updatedUserMetadata.remove(PASSWORD_HASH_USER_METADATA_KEY); + updatedUserMetadata.remove(PASSWORD_SALT_USER_METADATA_KEY); + final SnapshotInfo updatedSnapshotInfo = new SnapshotInfo( + snapshotInfo.snapshot(), + snapshotInfo.indices(), + snapshotInfo.dataStreams(), + snapshotInfo.featureStates(), + snapshotInfo.reason(), + snapshotInfo.version(), + snapshotInfo.startTime(), + snapshotInfo.endTime(), + snapshotInfo.totalShards(), + snapshotInfo.successfulShards(), + snapshotInfo.shardFailures(), + snapshotInfo.includeGlobalState(), + updatedUserMetadata, + snapshotInfo.state(), + snapshotInfo.indexSnapshotDetails() + ); super.finalizeSnapshot( - shardGenerations, - repositoryStateId, - clusterMetadata, - snapshotInfo, - repositoryMetaVersion, - stateTransformer, - listener + new FinalizeSnapshotContext( + finalizeSnapshotContext.updatedShardGenerations(), + finalizeSnapshotContext.repositoryStateId(), + finalizeSnapshotContext.clusterMetadata(), + updatedSnapshotInfo, + finalizeSnapshotContext.repositoryMetaVersion(), + finalizeSnapshotContext + ) ); }