From 480199df158de7e7e28cf955e6bcdd86f7c91cde Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 5 May 2020 18:06:13 +0200 Subject: [PATCH] Remove Snapshot INIT Step (#55918) With #55773 the snapshot INIT state step has become obsolete. We can set up the snapshot directly in one single step to simplify the state machine. This is a big help for building concurrent snapshots because it allows us to establish a deterministic order of operations between snapshot create and delete operations since all of their entries now contain a repository generation. With this change simple queuing up of snapshot operations can and will be added in a follow-up. --- .../discovery/SnapshotDisruptionIT.java | 83 ---------- .../SharedClusterSnapshotRestoreIT.java | 6 +- .../create/TransportCreateSnapshotAction.java | 14 +- .../snapshots/SnapshotsService.java | 143 +++++++++++++++++- .../elasticsearch/snapshots/package-info.java | 21 +-- .../snapshots/SnapshotResiliencyTests.java | 14 +- 6 files changed, 168 insertions(+), 113 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java b/server/src/internalClusterTest/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java index 6dc7f51a07037..c05eb8093f83e 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java @@ -30,7 +30,6 @@ import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.plugins.Plugin; @@ -46,10 +45,6 @@ import java.util.Arrays; import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -77,84 +72,6 @@ protected Settings nodeSettings(int nodeOrdinal) { .build(); } - public void testDisruptionOnSnapshotInitialization() throws Exception { - final String idxName = "test"; - final List allMasterEligibleNodes = internalCluster().startMasterOnlyNodes(3); - final String dataNode = internalCluster().startDataOnlyNode(); - ensureStableCluster(4); - - createRandomIndex(idxName); - - logger.info("--> creating repository"); - assertAcked(client().admin().cluster().preparePutRepository("test-repo") - .setType("fs").setSettings(Settings.builder() - .put("location", randomRepoPath()) - .put("compress", randomBoolean()) - .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES))); - - // Writing incompatible snapshot can cause this test to fail due to a race condition in repo initialization - // by the current master and the former master. It is not causing any issues in real life scenario, but - // might make this test to fail. We are going to complete initialization of the snapshot to prevent this failures. - logger.info("--> initializing the repository"); - assertEquals(SnapshotState.SUCCESS, client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1") - .setWaitForCompletion(true).setIncludeGlobalState(true).setIndices().get().getSnapshotInfo().state()); - - final String masterNode1 = internalCluster().getMasterName(); - Set otherNodes = new HashSet<>(); - otherNodes.addAll(allMasterEligibleNodes); - otherNodes.remove(masterNode1); - otherNodes.add(dataNode); - - NetworkDisruption networkDisruption = - new NetworkDisruption(new NetworkDisruption.TwoPartitions(Collections.singleton(masterNode1), otherNodes), - NetworkDisruption.UNRESPONSIVE); - internalCluster().setDisruptionScheme(networkDisruption); - - ClusterService clusterService = internalCluster().clusterService(masterNode1); - CountDownLatch disruptionStarted = new CountDownLatch(1); - clusterService.addListener(new ClusterStateListener() { - @Override - public void clusterChanged(ClusterChangedEvent event) { - SnapshotsInProgress snapshots = event.state().custom(SnapshotsInProgress.TYPE); - if (snapshots != null && snapshots.entries().size() > 0) { - if (snapshots.entries().get(0).state() == SnapshotsInProgress.State.INIT) { - // The snapshot started, we can start disruption so the INIT state will arrive to another master node - logger.info("--> starting disruption"); - networkDisruption.startDisrupting(); - clusterService.removeListener(this); - disruptionStarted.countDown(); - } - } - } - }); - - logger.info("--> starting snapshot"); - ActionFuture future = client(masterNode1).admin().cluster() - .prepareCreateSnapshot("test-repo", "test-snap-2").setWaitForCompletion(false).setIndices(idxName).execute(); - - logger.info("--> waiting for disruption to start"); - assertTrue(disruptionStarted.await(1, TimeUnit.MINUTES)); - - awaitNoMoreRunningOperations(dataNode); - - logger.info("--> verify that snapshot was successful or no longer exist"); - assertBusy(() -> { - try { - assertSnapshotExists("test-repo", "test-snap-2"); - } catch (SnapshotMissingException exception) { - logger.info("--> done verifying, snapshot doesn't exist"); - } - }, 1, TimeUnit.MINUTES); - - logger.info("--> stopping disrupting"); - networkDisruption.stopDisrupting(); - ensureStableCluster(4, masterNode1); - logger.info("--> done"); - - future.get(); - awaitNoMoreRunningOperations(masterNode1); - } - public void testDisruptionAfterFinalization() throws Exception { final String idxName = "test"; internalCluster().startMasterOnlyNodes(3); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index 6d89674dd010d..2a97afd8b82c0 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -2246,8 +2246,8 @@ public void testDeleteDataStreamDuringSnapshot() throws Exception { .put("block_on_data", true)); - String dataStream = "test-ds"; - DataStreamIT.putComposableIndexTemplate("dst", "@timestamp", org.elasticsearch.common.collect.List.of(dataStream)); + String dataStream = "datastream"; + DataStreamIT.putComposableIndexTemplate("dst", "@timestamp", Collections.singletonList(dataStream)); logger.info("--> indexing some data"); for (int i = 0; i < 100; i++) { @@ -2272,7 +2272,7 @@ public void testDeleteDataStreamDuringSnapshot() throws Exception { client.admin().indices().deleteDataStream(new DeleteDataStreamAction.Request(new String[]{dataStream})).actionGet(); fail("Expected deleting index to fail during snapshot"); } catch (SnapshotInProgressException e) { - assertThat(e.getMessage(), containsString("Cannot delete data streams that are being snapshotted: [test-ds")); + assertThat(e.getMessage(), containsString("Cannot delete data streams that are being snapshotted: ["+dataStream)); } finally { logger.info("--> unblock all data nodes"); unblockAllDataNodes("test-repo"); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/TransportCreateSnapshotAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/TransportCreateSnapshotAction.java index fd75abbc207d4..e4241a0266a3e 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/TransportCreateSnapshotAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/TransportCreateSnapshotAction.java @@ -73,10 +73,18 @@ protected ClusterBlockException checkBlock(CreateSnapshotRequest request, Cluste @Override protected void masterOperation(final CreateSnapshotRequest request, ClusterState state, final ActionListener listener) { - if (request.waitForCompletion()) { - snapshotsService.executeSnapshot(request, ActionListener.map(listener, CreateSnapshotResponse::new)); + if (state.nodes().getMinNodeVersion().before(SnapshotsService.NO_REPO_INITIALIZE_VERSION)) { + if (request.waitForCompletion()) { + snapshotsService.executeSnapshotLegacy(request, ActionListener.map(listener, CreateSnapshotResponse::new)); + } else { + snapshotsService.createSnapshotLegacy(request, ActionListener.map(listener, snapshot -> new CreateSnapshotResponse())); + } } else { - snapshotsService.createSnapshot(request, ActionListener.map(listener, snapshot -> new CreateSnapshotResponse())); + if (request.waitForCompletion()) { + snapshotsService.executeSnapshot(request, ActionListener.map(listener, CreateSnapshotResponse::new)); + } else { + snapshotsService.createSnapshot(request, ActionListener.map(listener, snapshot -> new CreateSnapshotResponse())); + } } } } diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 676fc83c340e0..15bf79c59bd19 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -173,11 +173,13 @@ public SnapshotsService(Settings settings, ClusterService clusterService, IndexN /** * Same as {@link #createSnapshot(CreateSnapshotRequest, ActionListener)} but invokes its callback on completion of * the snapshot. + * Note: This method is only used in clusters that contain a node older than {@link #NO_REPO_INITIALIZE_VERSION} to ensure a backwards + * compatible path for initializing the snapshot in the repository is executed. * * @param request snapshot request * @param listener snapshot completion listener */ - public void executeSnapshot(final CreateSnapshotRequest request, final ActionListener listener) { + public void executeSnapshotLegacy(final CreateSnapshotRequest request, final ActionListener listener) { createSnapshot(request, ActionListener.wrap(snapshot -> addListener(snapshot, ActionListener.map(listener, Tuple::v2)), listener::onFailure)); } @@ -187,11 +189,13 @@ public void executeSnapshot(final CreateSnapshotRequest request, final ActionLis *

* This method is used by clients to start snapshot. It makes sure that there is no snapshots are currently running and * creates a snapshot record in cluster state metadata. + * Note: This method is only used in clusters that contain a node older than {@link #NO_REPO_INITIALIZE_VERSION} to ensure a backwards + * compatible path for initializing the snapshot in the repository is executed. * * @param request snapshot request * @param listener snapshot creation listener */ - public void createSnapshot(final CreateSnapshotRequest request, final ActionListener listener) { + public void createSnapshotLegacy(final CreateSnapshotRequest request, final ActionListener listener) { final String repositoryName = request.repository(); final String snapshotName = indexNameExpressionResolver.resolveDateMathExpression(request.snapshot()); validate(repositoryName, snapshotName); @@ -287,6 +291,139 @@ public TimeValue timeout() { }); } + /** + * Same as {@link #createSnapshot(CreateSnapshotRequest, ActionListener)} but invokes its callback on completion of + * the snapshot. + * + * @param request snapshot request + * @param listener snapshot completion listener + */ + public void executeSnapshot(final CreateSnapshotRequest request, final ActionListener listener) { + createSnapshot(request, + ActionListener.wrap(snapshot -> addListener(snapshot, ActionListener.map(listener, Tuple::v2)), listener::onFailure)); + } + + /** + * Initializes the snapshotting process. + *

+ * This method is used by clients to start snapshot. It makes sure that there is no snapshots are currently running and + * creates a snapshot record in cluster state metadata. + * + * @param request snapshot request + * @param listener snapshot creation listener + */ + public void createSnapshot(final CreateSnapshotRequest request, final ActionListener listener) { + final String repositoryName = request.repository(); + final String snapshotName = indexNameExpressionResolver.resolveDateMathExpression(request.snapshot()); + validate(repositoryName, snapshotName); + final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); // new UUID for the snapshot + Repository repository = repositoriesService.repository(request.repository()); + if (repository.isReadOnly()) { + listener.onFailure( + new RepositoryException(repository.getMetadata().name(), "cannot create snapshot in a readonly repository")); + return; + } + final Snapshot snapshot = new Snapshot(repositoryName, snapshotId); + final Map userMeta = repository.adaptUserMetadata(request.userMetadata()); + repository.executeConsistentStateUpdate(repositoryData -> new ClusterStateUpdateTask() { + + private SnapshotsInProgress.Entry newEntry; + + @Override + public ClusterState execute(ClusterState currentState) { + // check if the snapshot name already exists in the repository + if (repositoryData.getSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) { + throw new InvalidSnapshotNameException( + repository.getMetadata().name(), snapshotName, "snapshot with the same name already exists"); + } + validate(repositoryName, snapshotName, currentState); + SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE); + if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) { + throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, + "cannot snapshot while a snapshot deletion is in-progress in [" + deletionsInProgress + "]"); + } + final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE); + if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) { + throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, + "cannot snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]"); + } + SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); + // Fail if there are any concurrently running snapshots. The only exception to this being a snapshot in INIT state from a + // previous master that we can simply ignore and remove from the cluster state because we would clean it up from the + // cluster state anyway in #applyClusterState. + if (snapshots != null && snapshots.entries().stream().anyMatch(entry -> entry.state() != State.INIT)) { + throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, " a snapshot is already running"); + } + // Store newSnapshot here to be processed in clusterStateProcessed + List indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState, request)); + logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices); + + final List indexIds = repositoryData.resolveNewIndices(indices); + final List dataStreams = + indexNameExpressionResolver.dataStreamNames(currentState, request.indicesOptions(), request.indices()); + final Version version = minCompatibleVersion( + clusterService.state().nodes().getMinNodeVersion(), repositoryName, repositoryData, null); + ImmutableOpenMap shards = + shards(currentState, indexIds, useShardGenerations(version), repositoryData); + if (request.partial() == false) { + Tuple, Set> indicesWithMissingShards = indicesWithMissingShards(shards, + currentState.metadata()); + Set missing = indicesWithMissingShards.v1(); + Set closed = indicesWithMissingShards.v2(); + if (missing.isEmpty() == false || closed.isEmpty() == false) { + final StringBuilder failureMessage = new StringBuilder(); + if (missing.isEmpty() == false) { + failureMessage.append("Indices don't have primary shards "); + failureMessage.append(missing); + } + if (closed.isEmpty() == false) { + if (failureMessage.length() > 0) { + failureMessage.append("; "); + } + failureMessage.append("Indices are closed "); + } + // TODO: We should just throw here instead of creating a FAILED and hence useless snapshot in the repository + newEntry = new SnapshotsInProgress.Entry( + new Snapshot(repositoryName, snapshotId), request.includeGlobalState(), false, + State.FAILED, indexIds, dataStreams, threadPool.absoluteTimeInMillis(), repositoryData.getGenId(), shards, + failureMessage.toString(), userMeta, version); + } + } + if (newEntry == null) { + newEntry = new SnapshotsInProgress.Entry( + new Snapshot(repositoryName, snapshotId), request.includeGlobalState(), request.partial(), + State.STARTED, indexIds, dataStreams, threadPool.absoluteTimeInMillis(), repositoryData.getGenId(), shards, + null, userMeta, version); + } + return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, + SnapshotsInProgress.of(Collections.singletonList(newEntry))).build(); + } + + @Override + public void onFailure(String source, Exception e) { + logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to create snapshot", repositoryName, snapshotName), e); + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, final ClusterState newState) { + try { + logger.info("snapshot [{}] started", snapshot); + listener.onResponse(snapshot); + } finally { + if (newEntry.state().completed() || newEntry.shards().isEmpty()) { + endSnapshot(newEntry, newState.metadata()); + } + } + } + + @Override + public TimeValue timeout() { + return request.masterNodeTimeout(); + } + }, "create_snapshot [" + snapshotName + ']', listener::onFailure); + } + /** * Validates snapshot request * @@ -332,6 +469,8 @@ private static void validate(final String repositoryName, final String snapshotN * Starts snapshot. *

* Creates snapshot in repository and updates snapshot metadata record with list of shards that needs to be processed. + * Note: This method is only used in clusters that contain a node older than {@link #NO_REPO_INITIALIZE_VERSION} to ensure a backwards + * compatible path for initializing the snapshot in the repository is executed. * * @param clusterState cluster state * @param snapshot snapshot meta data diff --git a/server/src/main/java/org/elasticsearch/snapshots/package-info.java b/server/src/main/java/org/elasticsearch/snapshots/package-info.java index c651b74e69d46..55a0c93433b07 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/package-info.java +++ b/server/src/main/java/org/elasticsearch/snapshots/package-info.java @@ -35,19 +35,14 @@ *

Snapshot Creation

*

Snapshots are created by the following sequence of events:

*
    - *
  1. An invocation of {@link org.elasticsearch.snapshots.SnapshotsService#createSnapshot} enqueues a cluster state update to create - * a {@link org.elasticsearch.cluster.SnapshotsInProgress.Entry} in the cluster state's {@code SnapshotsInProgress}. This initial snapshot - * entry has its state set to {@code INIT} and an empty map set for the state of the individual shard's snapshots.
  2. - * - *
  3. After the snapshot's entry with state {@code INIT} is in the cluster state, {@link org.elasticsearch.snapshots.SnapshotsService} - * determines the primary shards' assignments for all indices that are being snapshotted and updates the existing - * {@code SnapshotsInProgress.Entry} with state {@code STARTED} and adds the map of {@link org.elasticsearch.index.shard.ShardId} to - * {@link org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus} that tracks the assignment of which node is to snapshot which - * shard. All shard snapshots are executed on the shard's primary node. Thus all shards for which the primary node was found to have a - * healthy copy of the shard are marked as being in state {@code INIT} in this map. If the primary for a shard is unassigned, it is marked - * as {@code MISSING} in this map. In case the primary is initializing at this point, it is marked as in state {@code WAITING}. In case a - * shard's primary is relocated at any point after its {@code SnapshotsInProgress.Entry} has moved to state {@code STARTED} and thus been - * assigned to a specific cluster node, that shard's snapshot will fail and move to state {@code FAILED}.
  4. + *
  5. First the {@link org.elasticsearch.snapshots.SnapshotsService} determines the primary shards' assignments for all indices that are + * being snapshotted and creates a {@code SnapshotsInProgress.Entry} with state {@code STARTED} and adds the map of + * {@link org.elasticsearch.index.shard.ShardId} to {@link org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus} that tracks + * the assignment of which node is to snapshot which shard. All shard snapshots are executed on the shard's primary node. Thus all shards + * for which the primary node was found to have a healthy copy of the shard are marked as being in state {@code INIT} in this map. If the + * primary for a shard is unassigned, it is marked as {@code MISSING} in this map. In case the primary is initializing at this point, it is + * marked as in state {@code WAITING}. In case a shard's primary is relocated at any point after its {@code SnapshotsInProgress.Entry} was + * created and thus been assigned to a specific cluster node, that shard's snapshot will fail and move to state {@code FAILED}.
  6. * *
  7. The new {@code SnapshotsInProgress.Entry} is then observed by * {@link org.elasticsearch.snapshots.SnapshotShardsService#clusterChanged} on all nodes and since the entry is in state {@code STARTED} diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 6c09a997cbd15..6f1815286deac 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -870,8 +870,6 @@ public void testSnapshotPrimaryRelocations() { continueOrDie(createRepoAndIndex(repoName, index, shards), createIndexResponse -> client().admin().cluster().state(new ClusterStateRequest(), clusterStateResponseStepListener)); - final StepListener snapshotStartedListener = new StepListener<>(); - continueOrDie(clusterStateResponseStepListener, clusterStateResponse -> { final ShardRouting shardToRelocate = clusterStateResponse.getState().routingTable().allShards(index).get(0); final TestClusterNodes.TestClusterNode currentPrimaryNode = testClusterNodes.nodeById(shardToRelocate.currentNodeId()); @@ -890,7 +888,11 @@ public void run() { scheduleNow(() -> testClusterNodes.stopNode(masterNode)); } testClusterNodes.randomDataNodeSafe().client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName) - .execute(snapshotStartedListener); + .execute(ActionListener.wrap(() -> { + createdSnapshot.set(true); + testClusterNodes.randomDataNodeSafe().client.admin().cluster().deleteSnapshot( + new DeleteSnapshotRequest(repoName, snapshotName), noopListener()); + })); scheduleNow( () -> testClusterNodes.randomMasterNodeSafe().client.admin().cluster().reroute( new ClusterRerouteRequest().add(new AllocateEmptyPrimaryAllocationCommand( @@ -903,12 +905,6 @@ public void run() { }); }); - continueOrDie(snapshotStartedListener, snapshotResponse -> { - createdSnapshot.set(true); - testClusterNodes.randomDataNodeSafe().client.admin().cluster().deleteSnapshot( - new DeleteSnapshotRequest(repoName, snapshotName), noopListener()); - }); - runUntil(() -> testClusterNodes.randomMasterNode().map(master -> { if (createdSnapshot.get() == false) { return false;