diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index 7ddecd8b2fdb3..8438fdda96dc5 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -140,6 +140,12 @@ public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, Sta useShardGenerations); } + public Entry(Entry entry, State state, List indices, long repositoryStateId, + ImmutableOpenMap shards, boolean useShardGenerations, String failure) { + this(entry.snapshot, entry.includeGlobalState, entry.partial, state, indices, entry.startTime, repositoryStateId, shards, + failure, entry.userMetadata, useShardGenerations); + } + public Entry(Entry entry, State state, ImmutableOpenMap shards) { this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime, entry.repositoryStateId, shards, entry.failure, entry.userMetadata, entry.useShardGenerations); diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 45967235e8086..9762beac4eff7 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -107,7 +107,7 @@ *
  • On the master node the {@link #createSnapshot(CreateSnapshotRequest, ActionListener)} is called and makes sure that * no snapshot is currently running and registers the new snapshot in cluster state
  • *
  • When cluster state is updated - * the {@link #beginSnapshot(ClusterState, SnapshotsInProgress.Entry, boolean, ActionListener)} method kicks in and initializes + * the {@link #beginSnapshot} method kicks in and initializes * the snapshot in the repository and then populates list of shards that needs to be snapshotted in cluster state
  • *
  • Each data node is watching for these shards and when new shards scheduled for snapshotting appear in the cluster state, data nodes * start processing them through {@link SnapshotShardsService#startNewSnapshots} method
  • @@ -268,90 +268,85 @@ public void createSnapshot(final CreateSnapshotRequest request, final ActionList final String snapshotName = indexNameExpressionResolver.resolveDateMathExpression(request.snapshot()); validate(repositoryName, snapshotName); final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); // new UUID for the snapshot - final StepListener repositoryDataListener = new StepListener<>(); - repositoriesService.repository(repositoryName).getRepositoryData(repositoryDataListener); - repositoryDataListener.whenComplete(repositoryData -> { - final boolean hasOldFormatSnapshots = hasOldVersionSnapshots(repositoryName, repositoryData, null); - clusterService.submitStateUpdateTask("create_snapshot [" + snapshotName + ']', new ClusterStateUpdateTask() { - - private SnapshotsInProgress.Entry newSnapshot = null; - - @Override - public ClusterState execute(ClusterState currentState) { - validate(repositoryName, snapshotName, currentState); - SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE); - if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) { - throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, - "cannot snapshot while a snapshot deletion is in-progress in [" + deletionsInProgress + "]"); - } - final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE); - if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) { - throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, - "cannot snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]"); - } - SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); - if (snapshots == null || snapshots.entries().isEmpty()) { - // Store newSnapshot here to be processed in clusterStateProcessed - List indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState, - request.indicesOptions(), request.indices())); - logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices); - List snapshotIndices = repositoryData.resolveNewIndices(indices); - newSnapshot = new SnapshotsInProgress.Entry( - new Snapshot(repositoryName, snapshotId), - request.includeGlobalState(), request.partial(), - State.INIT, - snapshotIndices, - threadPool.absoluteTimeInMillis(), - repositoryData.getGenId(), - null, - request.userMetadata(), - hasOldFormatSnapshots == false && - clusterService.state().nodes().getMinNodeVersion().onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION)); - initializingSnapshots.add(newSnapshot.snapshot()); - snapshots = new SnapshotsInProgress(newSnapshot); - } else { - throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, " a snapshot is already running"); - } - return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build(); + clusterService.submitStateUpdateTask("create_snapshot [" + snapshotName + ']', new ClusterStateUpdateTask() { + + private SnapshotsInProgress.Entry newSnapshot = null; + + private List indices; + + @Override + public ClusterState execute(ClusterState currentState) { + validate(repositoryName, snapshotName, currentState); + SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE); + if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) { + throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, + "cannot snapshot while a snapshot deletion is in-progress in [" + deletionsInProgress + "]"); } + final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE); + if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) { + throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, + "cannot snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]"); + } + SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); + if (snapshots != null && snapshots.entries().isEmpty() == false) { + throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, " a snapshot is already running"); + } + // Store newSnapshot here to be processed in clusterStateProcessed + indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState, + request.indicesOptions(), request.indices())); + logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices); + newSnapshot = new SnapshotsInProgress.Entry( + new Snapshot(repositoryName, snapshotId), + request.includeGlobalState(), request.partial(), + State.INIT, + Collections.emptyList(), // We'll resolve the list of indices when moving to the STARTED state in #beginSnapshot + threadPool.absoluteTimeInMillis(), + RepositoryData.UNKNOWN_REPO_GEN, + null, + request.userMetadata(), false + ); + initializingSnapshots.add(newSnapshot.snapshot()); + snapshots = new SnapshotsInProgress(newSnapshot); + return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build(); + } - @Override - public void onFailure(String source, Exception e) { - logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to create snapshot", repositoryName, snapshotName), e); - if (newSnapshot != null) { - initializingSnapshots.remove(newSnapshot.snapshot()); - } - newSnapshot = null; - listener.onFailure(e); + @Override + public void onFailure(String source, Exception e) { + logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to create snapshot", repositoryName, snapshotName), e); + if (newSnapshot != null) { + initializingSnapshots.remove(newSnapshot.snapshot()); } + newSnapshot = null; + listener.onFailure(e); + } - @Override - public void clusterStateProcessed(String source, ClusterState oldState, final ClusterState newState) { - if (newSnapshot != null) { - final Snapshot current = newSnapshot.snapshot(); - assert initializingSnapshots.contains(current); - beginSnapshot(newState, newSnapshot, request.partial(), new ActionListener<>() { - @Override - public void onResponse(final Snapshot snapshot) { - initializingSnapshots.remove(snapshot); - listener.onResponse(snapshot); - } + @Override + public void clusterStateProcessed(String source, ClusterState oldState, final ClusterState newState) { + if (newSnapshot != null) { + final Snapshot current = newSnapshot.snapshot(); + assert initializingSnapshots.contains(current); + assert indices != null; + beginSnapshot(newState, newSnapshot, request.partial(), indices, new ActionListener<>() { + @Override + public void onResponse(final Snapshot snapshot) { + initializingSnapshots.remove(snapshot); + listener.onResponse(snapshot); + } - @Override - public void onFailure(final Exception e) { - initializingSnapshots.remove(current); - listener.onFailure(e); - } - }); - } + @Override + public void onFailure(final Exception e) { + initializingSnapshots.remove(current); + listener.onFailure(e); + } + }); } + } - @Override - public TimeValue timeout() { - return request.masterNodeTimeout(); - } - }); - }, listener::onFailure); + @Override + public TimeValue timeout() { + return request.masterNodeTimeout(); + } + }); } public boolean hasOldVersionSnapshots(String repositoryName, RepositoryData repositoryData, @Nullable SnapshotId excluded) { @@ -436,6 +431,7 @@ private static void validate(final String repositoryName, final String snapshotN private void beginSnapshot(final ClusterState clusterState, final SnapshotsInProgress.Entry snapshot, final boolean partial, + final List indices, final ActionListener userCreateSnapshotListener) { threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() { @@ -460,13 +456,20 @@ protected void doRun() { throw new InvalidSnapshotNameException( repository.getMetadata().name(), snapshotName, "snapshot with the same name already exists"); } + snapshotCreated = true; logger.info("snapshot [{}] started", snapshot.snapshot()); - if (snapshot.indices().isEmpty()) { + final boolean hasOldFormatSnapshots = + hasOldVersionSnapshots(snapshot.snapshot().getRepository(), repositoryData, null); + final boolean writeShardGenerations = hasOldFormatSnapshots == false && + clusterService.state().nodes().getMinNodeVersion().onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION); + if (indices.isEmpty()) { // No indices in this snapshot - we are done userCreateSnapshotListener.onResponse(snapshot.snapshot()); - endSnapshot(snapshot, clusterState.metaData()); + endSnapshot(new SnapshotsInProgress.Entry( + snapshot, State.STARTED, Collections.emptyList(), repositoryData.getGenId(), null, writeShardGenerations, + null), clusterState.metaData()); return; } clusterService.submitStateUpdateTask("update_snapshot [" + snapshot.snapshot() + "]", new ClusterStateUpdateTask() { @@ -486,8 +489,10 @@ public ClusterState execute(ClusterState currentState) { assert entry.shards().isEmpty(); hadAbortedInitializations = true; } else { + final List indexIds = repositoryData.resolveNewIndices(indices); // Replace the snapshot that was just initialized - ImmutableOpenMap shards = shards(currentState, entry, repositoryData); + ImmutableOpenMap shards = + shards(currentState, indexIds, writeShardGenerations, repositoryData); if (!partial) { Tuple, Set> indicesWithMissingShards = indicesWithMissingShards(shards, currentState.metaData()); @@ -506,12 +511,13 @@ public ClusterState execute(ClusterState currentState) { failureMessage.append("Indices are closed "); failureMessage.append(closed); } - entries.add( - new SnapshotsInProgress.Entry(entry, State.FAILED, shards, failureMessage.toString())); + entries.add(new SnapshotsInProgress.Entry(entry, State.FAILED, indexIds, + repositoryData.getGenId(), shards, writeShardGenerations, failureMessage.toString())); continue; } } - entries.add(new SnapshotsInProgress.Entry(entry, State.STARTED, shards)); + entries.add(new SnapshotsInProgress.Entry(entry, State.STARTED, indexIds, repositoryData.getGenId(), + shards, writeShardGenerations, null)); } } return ClusterState.builder(currentState) @@ -1493,17 +1499,19 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS /** * Calculates the list of shards that should be included into the current snapshot * - * @param clusterState cluster state - * @param snapshot SnapshotsInProgress Entry + * @param clusterState cluster state + * @param indices Indices to snapshot + * @param useShardGenerations whether to write {@link ShardGenerations} during the snapshot * @return list of shard to be included into current snapshot */ private static ImmutableOpenMap shards(ClusterState clusterState, - SnapshotsInProgress.Entry snapshot, + List indices, + boolean useShardGenerations, RepositoryData repositoryData) { ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(); MetaData metaData = clusterState.metaData(); final ShardGenerations shardGenerations = repositoryData.shardGenerations(); - for (IndexId index : snapshot.indices()) { + for (IndexId index : indices) { final String indexName = index.getName(); final boolean isNewIndex = repositoryData.getIndices().containsKey(indexName) == false; IndexMetaData indexMetaData = metaData.index(indexName); @@ -1516,7 +1524,7 @@ private static ImmutableOpenMap