From ecf31f3548abf8b53d904d4337353c693ac781bf Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 28 Apr 2020 13:54:42 +0200 Subject: [PATCH] Add Functionality to Consistently Read RepositoryData For CS Updates (#55773) Using optimistic locking, add the ability to run a repository state update task with a consistent view of the current repository data. Allows for a follow-up to remove the snapshot INIT state. --- .../delete/TransportDeleteSnapshotAction.java | 4 +- .../repositories/FilterRepository.java | 8 ++ .../repositories/Repository.java | 17 ++++ .../blobstore/BlobStoreRepository.java | 62 +++++++++++++ .../snapshots/SnapshotsService.java | 86 ++++++++++++------- .../RepositoriesServiceTests.java | 7 ++ .../snapshots/SnapshotResiliencyTests.java | 60 +++++++++++++ .../index/shard/RestoreOnlyRepository.java | 8 ++ .../xpack/ccr/repository/CcrRepository.java | 8 ++ 9 files changed, 226 insertions(+), 34 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/delete/TransportDeleteSnapshotAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/delete/TransportDeleteSnapshotAction.java index 912b492015a69..8eeaef720fa3a 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/delete/TransportDeleteSnapshotAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/delete/TransportDeleteSnapshotAction.java @@ -35,7 +35,6 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.util.Arrays; /** * Transport action for delete snapshot operation @@ -71,7 +70,6 @@ protected ClusterBlockException checkBlock(DeleteSnapshotRequest request, Cluste @Override protected void masterOperation(final DeleteSnapshotRequest request, ClusterState state, final ActionListener listener) { - snapshotsService.deleteSnapshots(request.repository(), Arrays.asList(request.snapshots()), - ActionListener.map(listener, v -> new AcknowledgedResponse(true))); + snapshotsService.deleteSnapshots(request, ActionListener.map(listener, v -> new AcknowledgedResponse(true))); } } diff --git a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java index 7d36931c1aec8..b6ac4958975c3 100644 --- a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java @@ -22,6 +22,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; @@ -42,6 +43,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import java.util.function.Function; public class FilterRepository implements Repository { @@ -151,6 +153,12 @@ public void updateState(ClusterState state) { in.updateState(state); } + @Override + public void executeConsistentStateUpdate(Function createUpdateTask, String source, + Consumer onFailure) { + in.executeConsistentStateUpdate(createUpdateTask, source, onFailure); + } + @Override public Lifecycle.State lifecycleState() { return in.lifecycleState(); diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java index 8212294cc6b58..1fb3bca5fc696 100644 --- a/server/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java @@ -22,6 +22,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; @@ -43,6 +44,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import java.util.function.Function; /** @@ -269,6 +271,21 @@ void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId s */ void updateState(ClusterState state); + /** + * Execute a cluster state update with a consistent view of the current {@link RepositoryData}. The {@link ClusterState} passed to the + * task generated through {@code createUpdateTask} is guaranteed to point at the same state for this repository as the did the state + * at the time the {@code RepositoryData} was loaded. + * This allows for operations on the repository that need a consistent view of both the cluster state and the repository contents at + * one point in time like for example, checking if a snapshot is in the repository before adding the delete operation for it to the + * cluster state. + * + * @param createUpdateTask function to supply cluster state update task + * @param source the source of the cluster state update task + * @param onFailure error handler invoked on failure to get a consistent view of the current {@link RepositoryData} + */ + void executeConsistentStateUpdate(Function createUpdateTask, String source, + Consumer onFailure); + /** * Hook that allows a repository to filter the user supplied snapshot metadata in {@link SnapshotsInProgress.Entry#userMetadata()} * during snapshot initialization. diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 71b63b6e74200..57537cbeb4219 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -76,6 +76,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -337,6 +338,67 @@ protected void doClose() { } } + @Override + public void executeConsistentStateUpdate(Function createUpdateTask, String source, + Consumer onFailure) { + threadPool.generic().execute(new AbstractRunnable() { + @Override + protected void doRun() { + final RepositoryMetadata repositoryMetadataStart = metadata; + getRepositoryData(ActionListener.wrap(repositoryData -> { + final ClusterStateUpdateTask updateTask = createUpdateTask.apply(repositoryData); + clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask(updateTask.priority()) { + + private boolean executedTask = false; + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + // Comparing the full metadata here on purpose instead of simply comparing the safe generation. + // If the safe generation has changed, then we have to reload repository data and start over. + // If the pending generation has changed we are in the midst of a write operation and might pick up the + // updated repository data and state on the retry. We don't want to wait for the write to finish though + // because it could fail for any number of reasons so we just retry instead of waiting on the cluster state + // to change in any form. + if (repositoryMetadataStart.equals(getRepoMetadata(currentState))) { + executedTask = true; + return updateTask.execute(currentState); + } + return currentState; + } + + @Override + public void onFailure(String source, Exception e) { + if (executedTask) { + updateTask.onFailure(source, e); + } else { + onFailure.accept(e); + } + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + if (executedTask) { + updateTask.clusterStateProcessed(source, oldState, newState); + } else { + executeConsistentStateUpdate(createUpdateTask, source, onFailure); + } + } + + @Override + public TimeValue timeout() { + return updateTask.timeout(); + } + }); + }, onFailure)); + } + + @Override + public void onFailure(Exception e) { + onFailure.accept(e); + } + }); + } + // Inspects all cluster state elements that contain a hint about what the current repository generation is and updates // #latestKnownRepoGen if a newer than currently known generation is found @Override diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index d137f390d610f..db6033c4115fc 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -30,6 +30,7 @@ import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.StepListener; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest; +import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateApplier; @@ -1000,14 +1001,17 @@ private void failSnapshotCompletionListeners(Snapshot snapshot, Exception e) { * If deleting a single snapshot, first checks if a snapshot is still running and if so cancels the snapshot and then deletes it from * the repository. * If the snapshot is not running or multiple snapshot names are given, moves to trying to find a matching {@link Snapshot}s for the - * given names in the repository and deletes them by invoking {@link #deleteCompletedSnapshots}. + * given names in the repository and deletes them. * - * @param repositoryName repositoryName - * @param snapshotNames snapshotNames + * @param request delete snapshot request * @param listener listener */ - public void deleteSnapshots(final String repositoryName, final Collection snapshotNames, final ActionListener listener) { - logger.info("deleting snapshots {} from repository [{}]", snapshotNames, repositoryName); + public void deleteSnapshots(final DeleteSnapshotRequest request, final ActionListener listener) { + + final String[] snapshotNames = request.snapshots(); + final String repositoryName = request.repository(); + logger.info(() -> new ParameterizedMessage("deleting snapshots [{}] from repository [{}]", + Strings.arrayToCommaDelimitedString(snapshotNames), repositoryName)); clusterService.submitStateUpdateTask("delete snapshot", new ClusterStateUpdateTask(Priority.NORMAL) { @@ -1017,15 +1021,15 @@ public void deleteSnapshots(final String repositoryName, final Collection 1 && currentState.nodes().getMinNodeVersion().before(MULTI_DELETE_VERSION)) { + if (snapshotNames.length > 1 && currentState.nodes().getMinNodeVersion().before(MULTI_DELETE_VERSION)) { throw new IllegalArgumentException("Deleting multiple snapshots in a single request is only supported in version [ " + MULTI_DELETE_VERSION + "] but cluster contained node of version [" + currentState.nodes().getMinNodeVersion() + "]"); } final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); final SnapshotsInProgress.Entry snapshotEntry; - if (snapshotNames.size() == 1) { - final String snapshotName = snapshotNames.iterator().next(); + if (snapshotNames.length == 1) { + final String snapshotName = snapshotNames[0]; if (Regex.isSimpleMatchPattern(snapshotName)) { snapshotEntry = null; } else { @@ -1101,18 +1105,23 @@ public void onFailure(String source, Exception e) { @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { if (runningSnapshot == null) { - threadPool.generic().execute(ActionRunnable.wrap(listener, l -> - repositoriesService.repository(repositoryName).getRepositoryData(ActionListener.wrap(repositoryData -> - deleteCompletedSnapshots(matchingSnapshotIds(repositoryData, snapshotNames, repositoryName), - repositoryName, repositoryData.getGenId(), Priority.NORMAL, l), l::onFailure)))); + try { + repositoriesService.repository(repositoryName).executeConsistentStateUpdate(repositoryData -> + createDeleteStateUpdate(matchingSnapshotIds(repositoryData, snapshotNames, repositoryName), repositoryName, + repositoryData.getGenId(), request.masterNodeTimeout(), Priority.NORMAL, listener), + "delete completed snapshots", listener::onFailure); + } catch (RepositoryMissingException e) { + listener.onFailure(e); + } return; } logger.trace("adding snapshot completion listener to wait for deleted snapshot to finish"); addListener(runningSnapshot, ActionListener.wrap( result -> { logger.debug("deleted snapshot completed - deleting files"); - deleteCompletedSnapshots(Collections.singletonList(result.v2().snapshotId()), repositoryName, - result.v1().getGenId(), Priority.IMMEDIATE, listener); + clusterService.submitStateUpdateTask("delete snapshot", + createDeleteStateUpdate(Collections.singletonList(result.v2().snapshotId()), repositoryName, + result.v1().getGenId(), null, Priority.IMMEDIATE, listener)); }, e -> { if (abortedDuringInit) { @@ -1133,10 +1142,15 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS } )); } + + @Override + public TimeValue timeout() { + return request.masterNodeTimeout(); + } }); } - private static List matchingSnapshotIds(RepositoryData repositoryData, Collection snapshotsOrPatterns, + private static List matchingSnapshotIds(RepositoryData repositoryData, String[] snapshotsOrPatterns, String repositoryName) { final Map allSnapshotIds = repositoryData.getSnapshotIds().stream().collect( Collectors.toMap(SnapshotId::getName, Function.identity())); @@ -1178,23 +1192,33 @@ private static SnapshotsInProgress.Entry findInProgressSnapshot(@Nullable Snapsh return snapshotEntry; } - /** - * Deletes snapshots that are assumed to be in the repository and not tracked as in-progress in the cluster state. - * - * @param snapshotIds Snapshots to delete - * @param repoName Repository name - * @param repositoryStateId Repository generation to base the delete on - * @param listener Listener to complete when done - */ - private void deleteCompletedSnapshots(List snapshotIds, String repoName, long repositoryStateId, Priority priority, - ActionListener listener) { + private ClusterStateUpdateTask createDeleteStateUpdate(List snapshotIds, String repoName, long repositoryStateId, + @Nullable TimeValue timeout, Priority priority, ActionListener listener) { + // Short circuit to noop state update if there isn't anything to delete if (snapshotIds.isEmpty()) { - listener.onResponse(null); - return; + return new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + return currentState; + } + + @Override + public void onFailure(String source, Exception e) { + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + listener.onResponse(null); + } + + @Override + public TimeValue timeout() { + return timeout; + } + }; } - logger.debug("deleting snapshots {} assuming repository generation [{}] and with priority [{}]", snapshotIds, repositoryStateId, - priority); - clusterService.submitStateUpdateTask("delete snapshot", new ClusterStateUpdateTask(priority) { + return new ClusterStateUpdateTask(priority) { @Override public ClusterState execute(ClusterState currentState) { SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE); @@ -1250,7 +1274,7 @@ public void onFailure(String source, Exception e) { public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { deleteSnapshotsFromRepository(repoName, snapshotIds, listener, repositoryStateId, newState.nodes().getMinNodeVersion()); } - }); + }; } /** diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java index 915d5e8a58820..e6bab4428d74c 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; @@ -53,6 +54,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import java.util.function.Function; import static org.mockito.Mockito.mock; @@ -230,6 +232,11 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, In public void updateState(final ClusterState state) { } + @Override + public void executeConsistentStateUpdate(Function createUpdateTask, String source, + Consumer onFailure) { + } + @Override public Lifecycle.State lifecycleState() { return null; diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index e40c43a6ba797..a66ce4f275024 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -190,6 +190,7 @@ import java.io.IOException; import java.nio.file.Path; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -203,6 +204,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -221,6 +223,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.iterableWithSize; import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.notNullValue; import static org.mockito.Mockito.mock; public class SnapshotResiliencyTests extends ESTestCase { @@ -739,6 +742,63 @@ public void onFailure(Exception e) { assertEquals(0, snapshotInfo.failedShards()); } + public void testConcurrentDeletes() { + setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10)); + + String repoName = "repo"; + String snapshotName = "snapshot"; + final String index = "test"; + final int shards = randomIntBetween(1, 10); + + TestClusterNodes.TestClusterNode masterNode = + testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state()); + + final StepListener createSnapshotResponseStepListener = new StepListener<>(); + + continueOrDie(createRepoAndIndex(repoName, index, shards), + createIndexResponse -> client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName) + .setWaitForCompletion(true).execute(createSnapshotResponseStepListener)); + + final Collection> deleteSnapshotStepListeners = Arrays.asList(new StepListener<>(), new StepListener<>()); + + final AtomicInteger successfulDeletes = new AtomicInteger(0); + + continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> { + for (StepListener deleteListener : deleteSnapshotStepListeners) { + client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotName).execute( + ActionListener.wrap( + resp -> deleteListener.onResponse(true), + e -> { + final Throwable unwrapped = ExceptionsHelper.unwrap( + e, ConcurrentSnapshotExecutionException.class, SnapshotMissingException.class); + assertThat(unwrapped, notNullValue()); + deleteListener.onResponse(false); + })); + } + }); + + for (StepListener deleteListener : deleteSnapshotStepListeners) { + continueOrDie(deleteListener, deleted -> { + if (deleted) { + successfulDeletes.incrementAndGet(); + } + }); + } + + deterministicTaskQueue.runAllRunnableTasks(); + + SnapshotDeletionsInProgress deletionsInProgress = masterNode.clusterService.state().custom(SnapshotDeletionsInProgress.TYPE); + assertFalse(deletionsInProgress.hasDeletionsInProgress()); + final Repository repository = masterNode.repositoriesService.repository(repoName); + final RepositoryData repositoryData = getRepositoryData(repository); + Collection snapshotIds = repositoryData.getSnapshotIds(); + // We end up with no snapshots since at least one of the deletes worked out + assertThat(snapshotIds, empty()); + assertThat(successfulDeletes.get(), either(is(1)).or(is(2))); + // We did one snapshot and one delete so we went two steps from the empty generation (-1) to 1 + assertThat(repositoryData.getGenId(), is(1L)); + } + /** * Simulates concurrent restarts of data and master nodes as well as relocating a primary shard, while starting and subsequently * deleting a snapshot. diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java index d479676e40ad9..9c9b144e94cfd 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java @@ -22,6 +22,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; @@ -44,6 +45,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import java.util.function.Function; import static java.util.Collections.emptyList; @@ -157,4 +159,10 @@ public void verify(String verificationToken, DiscoveryNode localNode) { @Override public void updateState(final ClusterState state) { } + + @Override + public void executeConsistentStateUpdate(Function createUpdateTask, String source, + Consumer onFailure) { + throw new UnsupportedOperationException("Unsupported for restore-only repository"); + } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index b056f9c006ff6..40449f6ec724e 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -25,6 +25,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.MappingMetadata; import org.elasticsearch.cluster.metadata.Metadata; @@ -93,6 +94,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongConsumer; import java.util.function.Supplier; @@ -441,6 +443,12 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, In public void updateState(ClusterState state) { } + @Override + public void executeConsistentStateUpdate(Function createUpdateTask, String source, + Consumer onFailure) { + throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); + } + private void updateMappings(Client leaderClient, Index leaderIndex, long leaderMappingVersion, Client followerClient, Index followerIndex) { final PlainActionFuture indexMetadataFuture = new PlainActionFuture<>();