From 4e8ab43a3e6ecb9e2592f11ff3f845349c757a8f Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 23 Jan 2020 14:29:35 +0100 Subject: [PATCH] Simplify Snapshot Initialization (#51256) (#51344) We were loading `RepositoryData` twice during snapshot initialization, redundantly checking if a snapshot existed already. The first snapshot existence check is somewhat redundant because a snapshot could be created between loading `RepositoryData` and updating the cluster state with the `INIT` state snapshot entry. Also, it is much safer to do the subsequent checks for index existence in the repo and and the presence of old version snapshots once the `INIT` state entry prevents further snapshots from being created concurrently. While the current state of things will never lead to corruption on a concurrent snapshot creation, it could result in a situation (though unlikely) where all the snapshot's work is done on the data nodes, only to find out that the repository generation was off during snapshot finalization, failing there and leaving a bunch of dead data in the repository that won't be used in a subsequent snapshot (because the shard generation was never referenced due to the failed snapshot finalization). Note: This is a step on the way to parallel repository operations by making snapshot related CS and repo related CS more tightly correlated. --- .../cluster/SnapshotsInProgress.java | 6 + .../snapshots/SnapshotsService.java | 186 +++++++++--------- 2 files changed, 103 insertions(+), 89 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index 001562d13e1b2..ebb06e11971c3 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -140,6 +140,12 @@ public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, Sta useShardGenerations); } + public Entry(Entry entry, State state, List indices, long repositoryStateId, + ImmutableOpenMap shards, boolean useShardGenerations, String failure) { + this(entry.snapshot, entry.includeGlobalState, entry.partial, state, indices, entry.startTime, repositoryStateId, shards, + failure, entry.userMetadata, useShardGenerations); + } + public Entry(Entry entry, State state, ImmutableOpenMap shards) { this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime, entry.repositoryStateId, shards, entry.failure, entry.userMetadata, entry.useShardGenerations); diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 8cc7aa42347ab..56a4c0f1e6d83 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -107,7 +107,7 @@ *
  • On the master node the {@link #createSnapshot(CreateSnapshotRequest, ActionListener)} is called and makes sure that * no snapshot is currently running and registers the new snapshot in cluster state
  • *
  • When cluster state is updated - * the {@link #beginSnapshot(ClusterState, SnapshotsInProgress.Entry, boolean, ActionListener)} method kicks in and initializes + * the {@link #beginSnapshot} method kicks in and initializes * the snapshot in the repository and then populates list of shards that needs to be snapshotted in cluster state
  • *
  • Each data node is watching for these shards and when new shards scheduled for snapshotting appear in the cluster state, data nodes * start processing them through {@link SnapshotShardsService#startNewSnapshots} method
  • @@ -274,90 +274,85 @@ public void createSnapshot(final CreateSnapshotRequest request, final ActionList final String snapshotName = indexNameExpressionResolver.resolveDateMathExpression(request.snapshot()); validate(repositoryName, snapshotName); final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); // new UUID for the snapshot - final StepListener repositoryDataListener = new StepListener<>(); - repositoriesService.repository(repositoryName).getRepositoryData(repositoryDataListener); - repositoryDataListener.whenComplete(repositoryData -> { - final boolean hasOldFormatSnapshots = hasOldVersionSnapshots(repositoryName, repositoryData, null); - clusterService.submitStateUpdateTask("create_snapshot [" + snapshotName + ']', new ClusterStateUpdateTask() { - - private SnapshotsInProgress.Entry newSnapshot = null; - - @Override - public ClusterState execute(ClusterState currentState) { - validate(repositoryName, snapshotName, currentState); - SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE); - if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) { - throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, - "cannot snapshot while a snapshot deletion is in-progress in [" + deletionsInProgress + "]"); - } - final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE); - if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) { - throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, - "cannot snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]"); - } - SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); - if (snapshots == null || snapshots.entries().isEmpty()) { - // Store newSnapshot here to be processed in clusterStateProcessed - List indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState, - request.indicesOptions(), request.indices())); - logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices); - List snapshotIndices = repositoryData.resolveNewIndices(indices); - newSnapshot = new SnapshotsInProgress.Entry( - new Snapshot(repositoryName, snapshotId), - request.includeGlobalState(), request.partial(), - State.INIT, - snapshotIndices, - threadPool.absoluteTimeInMillis(), - repositoryData.getGenId(), - null, - request.userMetadata(), - hasOldFormatSnapshots == false && - clusterService.state().nodes().getMinNodeVersion().onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION)); - initializingSnapshots.add(newSnapshot.snapshot()); - snapshots = new SnapshotsInProgress(newSnapshot); - } else { - throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, " a snapshot is already running"); - } - return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build(); + clusterService.submitStateUpdateTask("create_snapshot [" + snapshotName + ']', new ClusterStateUpdateTask() { + + private SnapshotsInProgress.Entry newSnapshot = null; + + private List indices; + + @Override + public ClusterState execute(ClusterState currentState) { + validate(repositoryName, snapshotName, currentState); + SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE); + if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) { + throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, + "cannot snapshot while a snapshot deletion is in-progress in [" + deletionsInProgress + "]"); } + final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE); + if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) { + throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, + "cannot snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]"); + } + SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); + if (snapshots != null && snapshots.entries().isEmpty() == false) { + throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, " a snapshot is already running"); + } + // Store newSnapshot here to be processed in clusterStateProcessed + indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState, + request.indicesOptions(), request.indices())); + logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices); + newSnapshot = new SnapshotsInProgress.Entry( + new Snapshot(repositoryName, snapshotId), + request.includeGlobalState(), request.partial(), + State.INIT, + Collections.emptyList(), // We'll resolve the list of indices when moving to the STARTED state in #beginSnapshot + threadPool.absoluteTimeInMillis(), + RepositoryData.UNKNOWN_REPO_GEN, + null, + request.userMetadata(), false + ); + initializingSnapshots.add(newSnapshot.snapshot()); + snapshots = new SnapshotsInProgress(newSnapshot); + return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build(); + } - @Override - public void onFailure(String source, Exception e) { - logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to create snapshot", repositoryName, snapshotName), e); - if (newSnapshot != null) { - initializingSnapshots.remove(newSnapshot.snapshot()); - } - newSnapshot = null; - listener.onFailure(e); + @Override + public void onFailure(String source, Exception e) { + logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to create snapshot", repositoryName, snapshotName), e); + if (newSnapshot != null) { + initializingSnapshots.remove(newSnapshot.snapshot()); } + newSnapshot = null; + listener.onFailure(e); + } - @Override - public void clusterStateProcessed(String source, ClusterState oldState, final ClusterState newState) { - if (newSnapshot != null) { - final Snapshot current = newSnapshot.snapshot(); - assert initializingSnapshots.contains(current); - beginSnapshot(newState, newSnapshot, request.partial(), new ActionListener() { - @Override - public void onResponse(final Snapshot snapshot) { - initializingSnapshots.remove(snapshot); - listener.onResponse(snapshot); - } + @Override + public void clusterStateProcessed(String source, ClusterState oldState, final ClusterState newState) { + if (newSnapshot != null) { + final Snapshot current = newSnapshot.snapshot(); + assert initializingSnapshots.contains(current); + assert indices != null; + beginSnapshot(newState, newSnapshot, request.partial(), indices, new ActionListener() { + @Override + public void onResponse(final Snapshot snapshot) { + initializingSnapshots.remove(snapshot); + listener.onResponse(snapshot); + } - @Override - public void onFailure(final Exception e) { - initializingSnapshots.remove(current); - listener.onFailure(e); - } - }); - } + @Override + public void onFailure(final Exception e) { + initializingSnapshots.remove(current); + listener.onFailure(e); + } + }); } + } - @Override - public TimeValue timeout() { - return request.masterNodeTimeout(); - } - }); - }, listener::onFailure); + @Override + public TimeValue timeout() { + return request.masterNodeTimeout(); + } + }); } public boolean hasOldVersionSnapshots(String repositoryName, RepositoryData repositoryData, @Nullable SnapshotId excluded) { @@ -442,6 +437,7 @@ private static void validate(final String repositoryName, final String snapshotN private void beginSnapshot(final ClusterState clusterState, final SnapshotsInProgress.Entry snapshot, final boolean partial, + final List indices, final ActionListener userCreateSnapshotListener) { threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() { @@ -474,13 +470,20 @@ protected void doRun() { snapshot.snapshot().getSnapshotId(), snapshot.indices(), metaDataForSnapshot(snapshot, clusterState.metaData())); } + snapshotCreated = true; logger.info("snapshot [{}] started", snapshot.snapshot()); - if (snapshot.indices().isEmpty()) { + final boolean hasOldFormatSnapshots = + hasOldVersionSnapshots(snapshot.snapshot().getRepository(), repositoryData, null); + final boolean writeShardGenerations = hasOldFormatSnapshots == false && + clusterService.state().nodes().getMinNodeVersion().onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION); + if (indices.isEmpty()) { // No indices in this snapshot - we are done userCreateSnapshotListener.onResponse(snapshot.snapshot()); - endSnapshot(snapshot, clusterState.metaData()); + endSnapshot(new SnapshotsInProgress.Entry( + snapshot, State.STARTED, Collections.emptyList(), repositoryData.getGenId(), null, writeShardGenerations, + null), clusterState.metaData()); return; } clusterService.submitStateUpdateTask("update_snapshot [" + snapshot.snapshot() + "]", new ClusterStateUpdateTask() { @@ -500,8 +503,10 @@ public ClusterState execute(ClusterState currentState) { assert entry.shards().isEmpty(); hadAbortedInitializations = true; } else { + final List indexIds = repositoryData.resolveNewIndices(indices); // Replace the snapshot that was just initialized - ImmutableOpenMap shards = shards(currentState, entry, repositoryData); + ImmutableOpenMap shards = + shards(currentState, indexIds, writeShardGenerations, repositoryData); if (!partial) { Tuple, Set> indicesWithMissingShards = indicesWithMissingShards(shards, currentState.metaData()); @@ -520,12 +525,13 @@ public ClusterState execute(ClusterState currentState) { failureMessage.append("Indices are closed "); failureMessage.append(closed); } - entries.add( - new SnapshotsInProgress.Entry(entry, State.FAILED, shards, failureMessage.toString())); + entries.add(new SnapshotsInProgress.Entry(entry, State.FAILED, indexIds, + repositoryData.getGenId(), shards, writeShardGenerations, failureMessage.toString())); continue; } } - entries.add(new SnapshotsInProgress.Entry(entry, State.STARTED, shards)); + entries.add(new SnapshotsInProgress.Entry(entry, State.STARTED, indexIds, repositoryData.getGenId(), + shards, writeShardGenerations, null)); } } return ClusterState.builder(currentState) @@ -1507,17 +1513,19 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS /** * Calculates the list of shards that should be included into the current snapshot * - * @param clusterState cluster state - * @param snapshot SnapshotsInProgress Entry + * @param clusterState cluster state + * @param indices Indices to snapshot + * @param useShardGenerations whether to write {@link ShardGenerations} during the snapshot * @return list of shard to be included into current snapshot */ private static ImmutableOpenMap shards(ClusterState clusterState, - SnapshotsInProgress.Entry snapshot, + List indices, + boolean useShardGenerations, RepositoryData repositoryData) { ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(); MetaData metaData = clusterState.metaData(); final ShardGenerations shardGenerations = repositoryData.shardGenerations(); - for (IndexId index : snapshot.indices()) { + for (IndexId index : indices) { final String indexName = index.getName(); final boolean isNewIndex = repositoryData.getIndices().containsKey(indexName) == false; IndexMetaData indexMetaData = metaData.index(indexName); @@ -1530,7 +1538,7 @@ private static ImmutableOpenMap