From 095b2692aac8fa7586b80f5ef481569142ea69ba Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Sat, 25 Apr 2020 17:35:49 +0200 Subject: [PATCH 1/4] Add Functionality to Consistently Update RepositoryData Using optimistic locking, add the ability to run a repository state update task with a consistent view of the current repository data. Allows for a follow-up to remove the snapshot init state. Closes #55702 --- .../repositories/FilterRepository.java | 8 +++ .../repositories/Repository.java | 15 +++++ .../blobstore/BlobStoreRepository.java | 55 +++++++++++++++++ .../snapshots/SnapshotsService.java | 51 +++++++++------- .../RepositoriesServiceTests.java | 7 +++ .../snapshots/SnapshotResiliencyTests.java | 60 ++++++++++++++++++- .../index/shard/RestoreOnlyRepository.java | 7 +++ .../xpack/ccr/repository/CcrRepository.java | 8 +++ 8 files changed, 187 insertions(+), 24 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java index 3a0b8fbc633b5..60b6cad8533f7 100644 --- a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java @@ -22,6 +22,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; @@ -42,6 +43,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import java.util.function.Function; public class FilterRepository implements Repository { @@ -146,6 +148,12 @@ public void updateState(ClusterState state) { in.updateState(state); } + @Override + public void executeConsistentStateUpdate(Function createUpdateTask, + Consumer onFailure) { + in.executeConsistentStateUpdate(createUpdateTask, onFailure); + } + @Override public Lifecycle.State lifecycleState() { return in.lifecycleState(); diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java index c5dc2f8c0fdca..3f0b53835b67a 100644 --- a/server/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java @@ -22,6 +22,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; @@ -43,6 +44,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import java.util.function.Function; /** @@ -256,6 +258,19 @@ void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId s */ void updateState(ClusterState state); + /** + * Execute a cluster state update with a consistent view of the current {@link RepositoryData}. The {@link ClusterState} passed to the + * task generated through {@code createUpdateTask} is guaranteed to point at the same state for this repository as the did the state + * at the time the {@code RepositoryData} was loaded. + * This allows for operations on the repository that need a consistent view of both the cluster state and the repository contents at + * one point in time like for example, checking if a snapshot is in the repository before adding the delete operation for it to the + * cluster state. + * + * @param createUpdateTask function to supply cluster state update task + * @param onFailure error handler invoked on failure to get a consistent view of the current {@link RepositoryData} + */ + void executeConsistentStateUpdate(Function createUpdateTask, Consumer onFailure); + /** * Hook that allows a repository to filter the user supplied snapshot metadata in {@link SnapshotsInProgress.Entry#userMetadata()} * during snapshot initialization. diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 4675d71023d8c..32e0efbeade17 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -330,6 +330,61 @@ protected void doClose() { } } + @Override + public void executeConsistentStateUpdate(Function createUpdateTask, + Consumer onFailure) { + final RepositoryMetadata repositoryMetadataStart = metadata; + threadPool.generic().execute(new AbstractRunnable() { + + @Override + protected void doRun() { + getRepositoryData(ActionListener.wrap(repositoryData -> + clusterService.submitStateUpdateTask("consistent state update", new ClusterStateUpdateTask() { + + private ClusterStateUpdateTask updateTask; + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + // Comparing the full metadata here on purpose instead of simply comparing the safe generation. + // If the safe generation has changed, then we have to reload repository data and start over. + // If the pending generation has changed we are in the midst of a write operation and might pick up the + // updated repository data and state on the retry. We don't want to wait for the write to finish though + // because it could fail for any number of reasons so we just retry instead of waiting on the cluster state + // to change in any form. + if (repositoryMetadataStart.equals(getRepoMetadata(currentState))) { + updateTask = createUpdateTask.apply(repositoryData); + return updateTask.execute(currentState); + } + return currentState; + } + + @Override + public void onFailure(String source, Exception e) { + if (updateTask == null) { + onFailure.accept(e); + } else { + updateTask.onFailure(source, e); + } + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + if (updateTask == null) { + executeConsistentStateUpdate(createUpdateTask, onFailure); + } else { + updateTask.clusterStateProcessed(source, oldState, newState); + } + } + }), onFailure)); + } + + @Override + public void onFailure(Exception e) { + onFailure.accept(e); + } + }); + } + // Inspects all cluster state elements that contain a hint about what the current repository generation is and updates // #latestKnownRepoGen if a newer than currently known generation is found @Override diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 5e00ddbd545ce..11be313071d84 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -982,7 +982,7 @@ private void failSnapshotCompletionListeners(Snapshot snapshot, Exception e) { * If deleting a single snapshot, first checks if a snapshot is still running and if so cancels the snapshot and then deletes it from * the repository. * If the snapshot is not running or multiple snapshot names are given, moves to trying to find a matching {@link Snapshot}s for the - * given names in the repository and deletes them by invoking {@link #deleteCompletedSnapshots}. + * given names in the repository and deletes them. * * @param repositoryName repositoryName * @param snapshotNames snapshotNames @@ -1083,18 +1083,18 @@ public void onFailure(String source, Exception e) { @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { if (runningSnapshot == null) { - threadPool.generic().execute(ActionRunnable.wrap(listener, l -> - repositoriesService.repository(repositoryName).getRepositoryData(ActionListener.wrap(repositoryData -> - deleteCompletedSnapshots(matchingSnapshotIds(repositoryData, snapshotNames, repositoryName), - repositoryName, repositoryData.getGenId(), Priority.NORMAL, l), l::onFailure)))); + repositoriesService.repository(repositoryName).executeConsistentStateUpdate(repositoryData -> + createDeleteStateUpdate(matchingSnapshotIds(repositoryData, snapshotNames, repositoryName), repositoryName, + repositoryData.getGenId(), Priority.NORMAL, listener), listener::onFailure); return; } logger.trace("adding snapshot completion listener to wait for deleted snapshot to finish"); addListener(runningSnapshot, ActionListener.wrap( result -> { logger.debug("deleted snapshot completed - deleting files"); - deleteCompletedSnapshots(Collections.singletonList(result.v2().snapshotId()), repositoryName, - result.v1().getGenId(), Priority.IMMEDIATE, listener); + clusterService.submitStateUpdateTask("delete snapshot", + createDeleteStateUpdate(Collections.singletonList(result.v2().snapshotId()), repositoryName, + result.v1().getGenId(), Priority.IMMEDIATE, listener)); }, e -> { if (abortedDuringInit) { @@ -1160,23 +1160,28 @@ private static SnapshotsInProgress.Entry findInProgressSnapshot(@Nullable Snapsh return snapshotEntry; } - /** - * Deletes snapshots that are assumed to be in the repository and not tracked as in-progress in the cluster state. - * - * @param snapshotIds Snapshots to delete - * @param repoName Repository name - * @param repositoryStateId Repository generation to base the delete on - * @param listener Listener to complete when done - */ - private void deleteCompletedSnapshots(List snapshotIds, String repoName, long repositoryStateId, Priority priority, - ActionListener listener) { + private ClusterStateUpdateTask createDeleteStateUpdate(List snapshotIds, String repoName, long repositoryStateId, + Priority priority, ActionListener listener) { + // Short circuit to noop state update if there isn't anything to delete if (snapshotIds.isEmpty()) { - listener.onResponse(null); - return; + return new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + return currentState; + } + + @Override + public void onFailure(String source, Exception e) { + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + listener.onResponse(null); + } + }; } - logger.debug("deleting snapshots {} assuming repository generation [{}] and with priority [{}]", snapshotIds, repositoryStateId, - priority); - clusterService.submitStateUpdateTask("delete snapshot", new ClusterStateUpdateTask(priority) { + return new ClusterStateUpdateTask(priority) { @Override public ClusterState execute(ClusterState currentState) { SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE); @@ -1232,7 +1237,7 @@ public void onFailure(String source, Exception e) { public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { deleteSnapshotsFromRepository(repoName, snapshotIds, listener, repositoryStateId, newState.nodes().getMinNodeVersion()); } - }); + }; } /** diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java index eda8dada6b8ae..0747619b669a0 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; @@ -53,6 +54,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import java.util.function.Function; import static org.mockito.Mockito.mock; @@ -225,6 +227,11 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, In public void updateState(final ClusterState state) { } + @Override + public void executeConsistentStateUpdate(Function createUpdateTask, + Consumer onFailure) { + } + @Override public Lifecycle.State lifecycleState() { return null; diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 4ddd2dec5bb28..c6edd70784cd0 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -202,6 +202,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -220,6 +221,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.iterableWithSize; import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.notNullValue; import static org.mockito.Mockito.mock; public class SnapshotResiliencyTests extends ESTestCase { @@ -506,7 +508,6 @@ public void clusterChanged(ClusterChangedEvent event) { assertEquals(0, snapshotInfo.failedShards()); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/55702") public void testConcurrentSnapshotCreateAndDeleteOther() { setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10)); @@ -738,6 +739,63 @@ public void onFailure(Exception e) { assertEquals(0, snapshotInfo.failedShards()); } + public void testConcurrentDeletes() { + setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10)); + + String repoName = "repo"; + String snapshotName = "snapshot"; + final String index = "test"; + final int shards = randomIntBetween(1, 10); + + TestClusterNodes.TestClusterNode masterNode = + testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state()); + + final StepListener createSnapshotResponseStepListener = new StepListener<>(); + + continueOrDie(createRepoAndIndex(repoName, index, shards), + createIndexResponse -> client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName) + .setWaitForCompletion(true).execute(createSnapshotResponseStepListener)); + + final Collection> deleteSnapshotStepListeners = List.of(new StepListener<>(), new StepListener<>()); + + final AtomicInteger successfulDeletes = new AtomicInteger(0); + + continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> { + for (StepListener deleteListener : deleteSnapshotStepListeners) { + client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotName).execute( + ActionListener.wrap( + resp -> deleteListener.onResponse(true), + e -> { + final Throwable unwrapped = ExceptionsHelper.unwrap( + e, ConcurrentSnapshotExecutionException.class, SnapshotMissingException.class); + assertThat(unwrapped, notNullValue()); + deleteListener.onResponse(false); + })); + } + }); + + for (StepListener deleteListener : deleteSnapshotStepListeners) { + continueOrDie(deleteListener, deleted -> { + if (deleted) { + successfulDeletes.incrementAndGet(); + } + }); + } + + deterministicTaskQueue.runAllRunnableTasks(); + + SnapshotDeletionsInProgress deletionsInProgress = masterNode.clusterService.state().custom(SnapshotDeletionsInProgress.TYPE); + assertFalse(deletionsInProgress.hasDeletionsInProgress()); + final Repository repository = masterNode.repositoriesService.repository(repoName); + final RepositoryData repositoryData = getRepositoryData(repository); + Collection snapshotIds = repositoryData.getSnapshotIds(); + // We end up with no snapshots since at least one of the deletes worked out + assertThat(snapshotIds, empty()); + assertThat(successfulDeletes.get(), either(is(1)).or(is(2))); + // We did one snapshot and one delete so we went two steps from the empty generation (-1) to 1 + assertThat(repositoryData.getGenId(), is(1L)); + } + /** * Simulates concurrent restarts of data and master nodes as well as relocating a primary shard, while starting and subsequently * deleting a snapshot. diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java index 42e3f75dbeeee..c44e8a66bf09a 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java @@ -22,6 +22,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; @@ -44,6 +45,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import java.util.function.Function; import static java.util.Collections.emptyList; @@ -153,4 +155,9 @@ public void verify(String verificationToken, DiscoveryNode localNode) { @Override public void updateState(final ClusterState state) { } + + @Override + public void executeConsistentStateUpdate(Function createUpdateTask, + Consumer onFailure) { + } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 02e5bdc64bb68..a44a51ecdbb49 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -24,6 +24,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.MappingMetadata; import org.elasticsearch.cluster.metadata.Metadata; @@ -92,6 +93,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongConsumer; import java.util.function.Supplier; @@ -433,6 +435,12 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, In public void updateState(ClusterState state) { } + @Override + public void executeConsistentStateUpdate(Function createUpdateTask, + Consumer onFailure) { + throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); + } + private void updateMappings(Client leaderClient, Index leaderIndex, long leaderMappingVersion, Client followerClient, Index followerIndex) { final PlainActionFuture indexMetadataFuture = new PlainActionFuture<>(); From 7fbebe2d82f200eab83ed4132c85a0cbb8827fb8 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Sun, 26 Apr 2020 18:10:06 +0200 Subject: [PATCH 2/4] more efficient --- .../repositories/blobstore/BlobStoreRepository.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 32e0efbeade17..fb9543c93a860 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -333,11 +333,10 @@ protected void doClose() { @Override public void executeConsistentStateUpdate(Function createUpdateTask, Consumer onFailure) { - final RepositoryMetadata repositoryMetadataStart = metadata; threadPool.generic().execute(new AbstractRunnable() { - @Override protected void doRun() { + final RepositoryMetadata repositoryMetadataStart = metadata; getRepositoryData(ActionListener.wrap(repositoryData -> clusterService.submitStateUpdateTask("consistent state update", new ClusterStateUpdateTask() { From b46783b91c782a2556337c061aed946be8123e57 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 27 Apr 2020 10:02:33 +0200 Subject: [PATCH 3/4] changes --- .../org/elasticsearch/snapshots/SnapshotsService.java | 10 +++++++--- .../index/shard/RestoreOnlyRepository.java | 1 + 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index b70e6d1f5b46d..fc7fc10dca04f 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -1088,9 +1088,13 @@ public void onFailure(String source, Exception e) { @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { if (runningSnapshot == null) { - repositoriesService.repository(repositoryName).executeConsistentStateUpdate(repositoryData -> - createDeleteStateUpdate(matchingSnapshotIds(repositoryData, snapshotNames, repositoryName), repositoryName, - repositoryData.getGenId(), Priority.NORMAL, listener), listener::onFailure); + try { + repositoriesService.repository(repositoryName).executeConsistentStateUpdate(repositoryData -> + createDeleteStateUpdate(matchingSnapshotIds(repositoryData, snapshotNames, repositoryName), repositoryName, + repositoryData.getGenId(), Priority.NORMAL, listener), listener::onFailure); + } catch (RepositoryMissingException e) { + listener.onFailure(e); + } return; } logger.trace("adding snapshot completion listener to wait for deleted snapshot to finish"); diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java index c44e8a66bf09a..fa831df78fac9 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java @@ -159,5 +159,6 @@ public void updateState(final ClusterState state) { @Override public void executeConsistentStateUpdate(Function createUpdateTask, Consumer onFailure) { + throw new UnsupportedOperationException("Unsupported for restore-only repository"); } } From e1427e042d734a77335dc2aeae7791d37d32b2d1 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 28 Apr 2020 12:41:13 +0200 Subject: [PATCH 4/4] timeout + source --- .../repositories/FilterRepository.java | 4 +- .../repositories/Repository.java | 4 +- .../blobstore/BlobStoreRepository.java | 72 ++++++++++--------- .../snapshots/SnapshotsService.java | 12 +++- .../RepositoriesServiceTests.java | 2 +- .../index/shard/RestoreOnlyRepository.java | 2 +- .../xpack/ccr/repository/CcrRepository.java | 2 +- 7 files changed, 57 insertions(+), 41 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java index 60b6cad8533f7..d41b0dc434e6e 100644 --- a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java @@ -149,9 +149,9 @@ public void updateState(ClusterState state) { } @Override - public void executeConsistentStateUpdate(Function createUpdateTask, + public void executeConsistentStateUpdate(Function createUpdateTask, String source, Consumer onFailure) { - in.executeConsistentStateUpdate(createUpdateTask, onFailure); + in.executeConsistentStateUpdate(createUpdateTask, source, onFailure); } @Override diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java index 3f0b53835b67a..50c667781f144 100644 --- a/server/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java @@ -267,9 +267,11 @@ void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId s * cluster state. * * @param createUpdateTask function to supply cluster state update task + * @param source the source of the cluster state update task * @param onFailure error handler invoked on failure to get a consistent view of the current {@link RepositoryData} */ - void executeConsistentStateUpdate(Function createUpdateTask, Consumer onFailure); + void executeConsistentStateUpdate(Function createUpdateTask, String source, + Consumer onFailure); /** * Hook that allows a repository to filter the user supplied snapshot metadata in {@link SnapshotsInProgress.Entry#userMetadata()} diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 2fa47937c4ca9..4e6073d5cb52b 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -76,6 +76,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -328,50 +329,57 @@ protected void doClose() { } @Override - public void executeConsistentStateUpdate(Function createUpdateTask, + public void executeConsistentStateUpdate(Function createUpdateTask, String source, Consumer onFailure) { threadPool.generic().execute(new AbstractRunnable() { @Override protected void doRun() { final RepositoryMetadata repositoryMetadataStart = metadata; - getRepositoryData(ActionListener.wrap(repositoryData -> - clusterService.submitStateUpdateTask("consistent state update", new ClusterStateUpdateTask() { + getRepositoryData(ActionListener.wrap(repositoryData -> { + final ClusterStateUpdateTask updateTask = createUpdateTask.apply(repositoryData); + clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask(updateTask.priority()) { - private ClusterStateUpdateTask updateTask; + private boolean executedTask = false; - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - // Comparing the full metadata here on purpose instead of simply comparing the safe generation. - // If the safe generation has changed, then we have to reload repository data and start over. - // If the pending generation has changed we are in the midst of a write operation and might pick up the - // updated repository data and state on the retry. We don't want to wait for the write to finish though - // because it could fail for any number of reasons so we just retry instead of waiting on the cluster state - // to change in any form. - if (repositoryMetadataStart.equals(getRepoMetadata(currentState))) { - updateTask = createUpdateTask.apply(repositoryData); - return updateTask.execute(currentState); - } - return currentState; + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + // Comparing the full metadata here on purpose instead of simply comparing the safe generation. + // If the safe generation has changed, then we have to reload repository data and start over. + // If the pending generation has changed we are in the midst of a write operation and might pick up the + // updated repository data and state on the retry. We don't want to wait for the write to finish though + // because it could fail for any number of reasons so we just retry instead of waiting on the cluster state + // to change in any form. + if (repositoryMetadataStart.equals(getRepoMetadata(currentState))) { + executedTask = true; + return updateTask.execute(currentState); } + return currentState; + } - @Override - public void onFailure(String source, Exception e) { - if (updateTask == null) { - onFailure.accept(e); - } else { - updateTask.onFailure(source, e); - } + @Override + public void onFailure(String source, Exception e) { + if (executedTask) { + updateTask.onFailure(source, e); + } else { + onFailure.accept(e); } + } - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - if (updateTask == null) { - executeConsistentStateUpdate(createUpdateTask, onFailure); - } else { - updateTask.clusterStateProcessed(source, oldState, newState); - } + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + if (executedTask) { + updateTask.clusterStateProcessed(source, oldState, newState); + } else { + executeConsistentStateUpdate(createUpdateTask, source, onFailure); } - }), onFailure)); + } + + @Override + public TimeValue timeout() { + return updateTask.timeout(); + } + }); + }, onFailure)); } @Override diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 61088a70a62f1..ae85e54c17dc4 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -1095,7 +1095,8 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS try { repositoriesService.repository(repositoryName).executeConsistentStateUpdate(repositoryData -> createDeleteStateUpdate(matchingSnapshotIds(repositoryData, snapshotNames, repositoryName), repositoryName, - repositoryData.getGenId(), Priority.NORMAL, listener), listener::onFailure); + repositoryData.getGenId(), request.masterNodeTimeout(), Priority.NORMAL, listener), + "delete completed snapshots", listener::onFailure); } catch (RepositoryMissingException e) { listener.onFailure(e); } @@ -1107,7 +1108,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS logger.debug("deleted snapshot completed - deleting files"); clusterService.submitStateUpdateTask("delete snapshot", createDeleteStateUpdate(Collections.singletonList(result.v2().snapshotId()), repositoryName, - result.v1().getGenId(), Priority.IMMEDIATE, listener)); + result.v1().getGenId(), null, Priority.IMMEDIATE, listener)); }, e -> { if (abortedDuringInit) { @@ -1179,7 +1180,7 @@ private static SnapshotsInProgress.Entry findInProgressSnapshot(@Nullable Snapsh } private ClusterStateUpdateTask createDeleteStateUpdate(List snapshotIds, String repoName, long repositoryStateId, - Priority priority, ActionListener listener) { + @Nullable TimeValue timeout, Priority priority, ActionListener listener) { // Short circuit to noop state update if there isn't anything to delete if (snapshotIds.isEmpty()) { return new ClusterStateUpdateTask() { @@ -1197,6 +1198,11 @@ public void onFailure(String source, Exception e) { public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { listener.onResponse(null); } + + @Override + public TimeValue timeout() { + return timeout; + } }; } return new ClusterStateUpdateTask(priority) { diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java index 0747619b669a0..2fa49e3ca3950 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java @@ -228,7 +228,7 @@ public void updateState(final ClusterState state) { } @Override - public void executeConsistentStateUpdate(Function createUpdateTask, + public void executeConsistentStateUpdate(Function createUpdateTask, String source, Consumer onFailure) { } diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java index fa831df78fac9..2a8d647e71f33 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java @@ -157,7 +157,7 @@ public void updateState(final ClusterState state) { } @Override - public void executeConsistentStateUpdate(Function createUpdateTask, + public void executeConsistentStateUpdate(Function createUpdateTask, String source, Consumer onFailure) { throw new UnsupportedOperationException("Unsupported for restore-only repository"); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index a44a51ecdbb49..dd566c46f7051 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -436,7 +436,7 @@ public void updateState(ClusterState state) { } @Override - public void executeConsistentStateUpdate(Function createUpdateTask, + public void executeConsistentStateUpdate(Function createUpdateTask, String source, Consumer onFailure) { throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); }