From c1fca1255bbdefc3f45fb620b5c46395a692963c Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 28 Apr 2020 13:54:42 +0200 Subject: [PATCH] Add Functionality to Consistently Read RepositoryData For CS Updates (#55773) Using optimistic locking, add the ability to run a repository state update task with a consistent view of the current repository data. Allows for a follow-up to remove the snapshot INIT state. --- .../repositories/FilterRepository.java | 8 +++ .../repositories/Repository.java | 17 +++++ .../blobstore/BlobStoreRepository.java | 62 +++++++++++++++++++ .../snapshots/SnapshotsService.java | 61 +++++++++++------- .../RepositoriesServiceTests.java | 7 +++ .../snapshots/SnapshotResiliencyTests.java | 60 +++++++++++++++++- .../index/shard/RestoreOnlyRepository.java | 8 +++ .../xpack/ccr/repository/CcrRepository.java | 8 +++ 8 files changed, 207 insertions(+), 24 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java index 3a0b8fbc633b5..d41b0dc434e6e 100644 --- a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java @@ -22,6 +22,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; @@ -42,6 +43,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import java.util.function.Function; public class FilterRepository implements Repository { @@ -146,6 +148,12 @@ public void updateState(ClusterState state) { in.updateState(state); } + @Override + public void executeConsistentStateUpdate(Function createUpdateTask, String source, + Consumer onFailure) { + in.executeConsistentStateUpdate(createUpdateTask, source, onFailure); + } + @Override public Lifecycle.State lifecycleState() { return in.lifecycleState(); diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java index c5dc2f8c0fdca..50c667781f144 100644 --- a/server/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java @@ -22,6 +22,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; @@ -43,6 +44,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import java.util.function.Function; /** @@ -256,6 +258,21 @@ void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId s */ void updateState(ClusterState state); + /** + * Execute a cluster state update with a consistent view of the current {@link RepositoryData}. The {@link ClusterState} passed to the + * task generated through {@code createUpdateTask} is guaranteed to point at the same state for this repository as the did the state + * at the time the {@code RepositoryData} was loaded. + * This allows for operations on the repository that need a consistent view of both the cluster state and the repository contents at + * one point in time like for example, checking if a snapshot is in the repository before adding the delete operation for it to the + * cluster state. + * + * @param createUpdateTask function to supply cluster state update task + * @param source the source of the cluster state update task + * @param onFailure error handler invoked on failure to get a consistent view of the current {@link RepositoryData} + */ + void executeConsistentStateUpdate(Function createUpdateTask, String source, + Consumer onFailure); + /** * Hook that allows a repository to filter the user supplied snapshot metadata in {@link SnapshotsInProgress.Entry#userMetadata()} * during snapshot initialization. diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index f48ef24ae2181..4e6073d5cb52b 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -76,6 +76,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -327,6 +328,67 @@ protected void doClose() { } } + @Override + public void executeConsistentStateUpdate(Function createUpdateTask, String source, + Consumer onFailure) { + threadPool.generic().execute(new AbstractRunnable() { + @Override + protected void doRun() { + final RepositoryMetadata repositoryMetadataStart = metadata; + getRepositoryData(ActionListener.wrap(repositoryData -> { + final ClusterStateUpdateTask updateTask = createUpdateTask.apply(repositoryData); + clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask(updateTask.priority()) { + + private boolean executedTask = false; + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + // Comparing the full metadata here on purpose instead of simply comparing the safe generation. + // If the safe generation has changed, then we have to reload repository data and start over. + // If the pending generation has changed we are in the midst of a write operation and might pick up the + // updated repository data and state on the retry. We don't want to wait for the write to finish though + // because it could fail for any number of reasons so we just retry instead of waiting on the cluster state + // to change in any form. + if (repositoryMetadataStart.equals(getRepoMetadata(currentState))) { + executedTask = true; + return updateTask.execute(currentState); + } + return currentState; + } + + @Override + public void onFailure(String source, Exception e) { + if (executedTask) { + updateTask.onFailure(source, e); + } else { + onFailure.accept(e); + } + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + if (executedTask) { + updateTask.clusterStateProcessed(source, oldState, newState); + } else { + executeConsistentStateUpdate(createUpdateTask, source, onFailure); + } + } + + @Override + public TimeValue timeout() { + return updateTask.timeout(); + } + }); + }, onFailure)); + } + + @Override + public void onFailure(Exception e) { + onFailure.accept(e); + } + }); + } + // Inspects all cluster state elements that contain a hint about what the current repository generation is and updates // #latestKnownRepoGen if a newer than currently known generation is found @Override diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 3197319c4dfdd..ae85e54c17dc4 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -988,7 +988,7 @@ private void failSnapshotCompletionListeners(Snapshot snapshot, Exception e) { * If deleting a single snapshot, first checks if a snapshot is still running and if so cancels the snapshot and then deletes it from * the repository. * If the snapshot is not running or multiple snapshot names are given, moves to trying to find a matching {@link Snapshot}s for the - * given names in the repository and deletes them by invoking {@link #deleteCompletedSnapshots}. + * given names in the repository and deletes them. * * @param request delete snapshot request * @param listener listener @@ -1092,18 +1092,23 @@ public void onFailure(String source, Exception e) { @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { if (runningSnapshot == null) { - threadPool.generic().execute(ActionRunnable.wrap(listener, l -> - repositoriesService.repository(repositoryName).getRepositoryData(ActionListener.wrap(repositoryData -> - deleteCompletedSnapshots(matchingSnapshotIds(repositoryData, snapshotNames, repositoryName), - repositoryName, repositoryData.getGenId(), Priority.NORMAL, l), l::onFailure)))); + try { + repositoriesService.repository(repositoryName).executeConsistentStateUpdate(repositoryData -> + createDeleteStateUpdate(matchingSnapshotIds(repositoryData, snapshotNames, repositoryName), repositoryName, + repositoryData.getGenId(), request.masterNodeTimeout(), Priority.NORMAL, listener), + "delete completed snapshots", listener::onFailure); + } catch (RepositoryMissingException e) { + listener.onFailure(e); + } return; } logger.trace("adding snapshot completion listener to wait for deleted snapshot to finish"); addListener(runningSnapshot, ActionListener.wrap( result -> { logger.debug("deleted snapshot completed - deleting files"); - deleteCompletedSnapshots(Collections.singletonList(result.v2().snapshotId()), repositoryName, - result.v1().getGenId(), Priority.IMMEDIATE, listener); + clusterService.submitStateUpdateTask("delete snapshot", + createDeleteStateUpdate(Collections.singletonList(result.v2().snapshotId()), repositoryName, + result.v1().getGenId(), null, Priority.IMMEDIATE, listener)); }, e -> { if (abortedDuringInit) { @@ -1174,23 +1179,33 @@ private static SnapshotsInProgress.Entry findInProgressSnapshot(@Nullable Snapsh return snapshotEntry; } - /** - * Deletes snapshots that are assumed to be in the repository and not tracked as in-progress in the cluster state. - * - * @param snapshotIds Snapshots to delete - * @param repoName Repository name - * @param repositoryStateId Repository generation to base the delete on - * @param listener Listener to complete when done - */ - private void deleteCompletedSnapshots(List snapshotIds, String repoName, long repositoryStateId, Priority priority, - ActionListener listener) { + private ClusterStateUpdateTask createDeleteStateUpdate(List snapshotIds, String repoName, long repositoryStateId, + @Nullable TimeValue timeout, Priority priority, ActionListener listener) { + // Short circuit to noop state update if there isn't anything to delete if (snapshotIds.isEmpty()) { - listener.onResponse(null); - return; + return new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + return currentState; + } + + @Override + public void onFailure(String source, Exception e) { + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + listener.onResponse(null); + } + + @Override + public TimeValue timeout() { + return timeout; + } + }; } - logger.debug("deleting snapshots {} assuming repository generation [{}] and with priority [{}]", snapshotIds, repositoryStateId, - priority); - clusterService.submitStateUpdateTask("delete snapshot", new ClusterStateUpdateTask(priority) { + return new ClusterStateUpdateTask(priority) { @Override public ClusterState execute(ClusterState currentState) { SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE); @@ -1246,7 +1261,7 @@ public void onFailure(String source, Exception e) { public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { deleteSnapshotsFromRepository(repoName, snapshotIds, listener, repositoryStateId, newState.nodes().getMinNodeVersion()); } - }); + }; } /** diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java index eda8dada6b8ae..2fa49e3ca3950 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; @@ -53,6 +54,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import java.util.function.Function; import static org.mockito.Mockito.mock; @@ -225,6 +227,11 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, In public void updateState(final ClusterState state) { } + @Override + public void executeConsistentStateUpdate(Function createUpdateTask, String source, + Consumer onFailure) { + } + @Override public Lifecycle.State lifecycleState() { return null; diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 0c38191120135..dde2ce8f3ef2f 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -202,6 +202,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -220,6 +221,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.iterableWithSize; import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.notNullValue; import static org.mockito.Mockito.mock; public class SnapshotResiliencyTests extends ESTestCase { @@ -506,7 +508,6 @@ public void clusterChanged(ClusterChangedEvent event) { assertEquals(0, snapshotInfo.failedShards()); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/55702") public void testConcurrentSnapshotCreateAndDeleteOther() { setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10)); @@ -738,6 +739,63 @@ public void onFailure(Exception e) { assertEquals(0, snapshotInfo.failedShards()); } + public void testConcurrentDeletes() { + setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10)); + + String repoName = "repo"; + String snapshotName = "snapshot"; + final String index = "test"; + final int shards = randomIntBetween(1, 10); + + TestClusterNodes.TestClusterNode masterNode = + testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state()); + + final StepListener createSnapshotResponseStepListener = new StepListener<>(); + + continueOrDie(createRepoAndIndex(repoName, index, shards), + createIndexResponse -> client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName) + .setWaitForCompletion(true).execute(createSnapshotResponseStepListener)); + + final Collection> deleteSnapshotStepListeners = List.of(new StepListener<>(), new StepListener<>()); + + final AtomicInteger successfulDeletes = new AtomicInteger(0); + + continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> { + for (StepListener deleteListener : deleteSnapshotStepListeners) { + client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotName).execute( + ActionListener.wrap( + resp -> deleteListener.onResponse(true), + e -> { + final Throwable unwrapped = ExceptionsHelper.unwrap( + e, ConcurrentSnapshotExecutionException.class, SnapshotMissingException.class); + assertThat(unwrapped, notNullValue()); + deleteListener.onResponse(false); + })); + } + }); + + for (StepListener deleteListener : deleteSnapshotStepListeners) { + continueOrDie(deleteListener, deleted -> { + if (deleted) { + successfulDeletes.incrementAndGet(); + } + }); + } + + deterministicTaskQueue.runAllRunnableTasks(); + + SnapshotDeletionsInProgress deletionsInProgress = masterNode.clusterService.state().custom(SnapshotDeletionsInProgress.TYPE); + assertFalse(deletionsInProgress.hasDeletionsInProgress()); + final Repository repository = masterNode.repositoriesService.repository(repoName); + final RepositoryData repositoryData = getRepositoryData(repository); + Collection snapshotIds = repositoryData.getSnapshotIds(); + // We end up with no snapshots since at least one of the deletes worked out + assertThat(snapshotIds, empty()); + assertThat(successfulDeletes.get(), either(is(1)).or(is(2))); + // We did one snapshot and one delete so we went two steps from the empty generation (-1) to 1 + assertThat(repositoryData.getGenId(), is(1L)); + } + /** * Simulates concurrent restarts of data and master nodes as well as relocating a primary shard, while starting and subsequently * deleting a snapshot. diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java index 42e3f75dbeeee..2a8d647e71f33 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java @@ -22,6 +22,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; @@ -44,6 +45,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import java.util.function.Function; import static java.util.Collections.emptyList; @@ -153,4 +155,10 @@ public void verify(String verificationToken, DiscoveryNode localNode) { @Override public void updateState(final ClusterState state) { } + + @Override + public void executeConsistentStateUpdate(Function createUpdateTask, String source, + Consumer onFailure) { + throw new UnsupportedOperationException("Unsupported for restore-only repository"); + } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 02e5bdc64bb68..dd566c46f7051 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -24,6 +24,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.MappingMetadata; import org.elasticsearch.cluster.metadata.Metadata; @@ -92,6 +93,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongConsumer; import java.util.function.Supplier; @@ -433,6 +435,12 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, In public void updateState(ClusterState state) { } + @Override + public void executeConsistentStateUpdate(Function createUpdateTask, String source, + Consumer onFailure) { + throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); + } + private void updateMappings(Client leaderClient, Index leaderIndex, long leaderMappingVersion, Client followerClient, Index followerIndex) { final PlainActionFuture indexMetadataFuture = new PlainActionFuture<>();