Skip to content

Commit

Permalink
Simplify Snapshot Initialization (#51256) (#51344)
Browse files Browse the repository at this point in the history
We were loading `RepositoryData` twice during snapshot initialization,
redundantly checking if a snapshot existed already.
The first snapshot existence check is somewhat redundant because a snapshot could be
created between loading `RepositoryData` and updating the cluster state with the `INIT`
state snapshot entry.
Also, it is much safer to do the subsequent checks for index existence in the repo and
and the presence of old version snapshots once the `INIT` state entry prevents further
snapshots from being created concurrently.
While the current state of things will never lead to corruption on a concurrent snapshot
creation, it could result in a situation (though unlikely) where all the snapshot's work
is done on the data nodes, only to find out that the repository generation was off during
snapshot finalization, failing there and leaving a bunch of dead data in the repository
that won't be used in a subsequent snapshot (because the shard generation was never referenced
due to the failed snapshot finalization).

Note: This is a step on the way to parallel repository operations by making snapshot related CS
and repo related CS more tightly correlated.
  • Loading branch information
original-brownbear authored Jan 23, 2020
1 parent 0a8d8d7 commit 4e8ab43
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,12 @@ public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, Sta
useShardGenerations);
}

public Entry(Entry entry, State state, List<IndexId> indices, long repositoryStateId,
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards, boolean useShardGenerations, String failure) {
this(entry.snapshot, entry.includeGlobalState, entry.partial, state, indices, entry.startTime, repositoryStateId, shards,
failure, entry.userMetadata, useShardGenerations);
}

public Entry(Entry entry, State state, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime,
entry.repositoryStateId, shards, entry.failure, entry.userMetadata, entry.useShardGenerations);
Expand Down
186 changes: 97 additions & 89 deletions server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@
* <li>On the master node the {@link #createSnapshot(CreateSnapshotRequest, ActionListener)} is called and makes sure that
* no snapshot is currently running and registers the new snapshot in cluster state</li>
* <li>When cluster state is updated
* the {@link #beginSnapshot(ClusterState, SnapshotsInProgress.Entry, boolean, ActionListener)} method kicks in and initializes
* the {@link #beginSnapshot} method kicks in and initializes
* the snapshot in the repository and then populates list of shards that needs to be snapshotted in cluster state</li>
* <li>Each data node is watching for these shards and when new shards scheduled for snapshotting appear in the cluster state, data nodes
* start processing them through {@link SnapshotShardsService#startNewSnapshots} method</li>
Expand Down Expand Up @@ -274,90 +274,85 @@ public void createSnapshot(final CreateSnapshotRequest request, final ActionList
final String snapshotName = indexNameExpressionResolver.resolveDateMathExpression(request.snapshot());
validate(repositoryName, snapshotName);
final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); // new UUID for the snapshot
final StepListener<RepositoryData> repositoryDataListener = new StepListener<>();
repositoriesService.repository(repositoryName).getRepositoryData(repositoryDataListener);
repositoryDataListener.whenComplete(repositoryData -> {
final boolean hasOldFormatSnapshots = hasOldVersionSnapshots(repositoryName, repositoryData, null);
clusterService.submitStateUpdateTask("create_snapshot [" + snapshotName + ']', new ClusterStateUpdateTask() {

private SnapshotsInProgress.Entry newSnapshot = null;

@Override
public ClusterState execute(ClusterState currentState) {
validate(repositoryName, snapshotName, currentState);
SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName,
"cannot snapshot while a snapshot deletion is in-progress in [" + deletionsInProgress + "]");
}
final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE);
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) {
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName,
"cannot snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]");
}
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
if (snapshots == null || snapshots.entries().isEmpty()) {
// Store newSnapshot here to be processed in clusterStateProcessed
List<String> indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState,
request.indicesOptions(), request.indices()));
logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices);
List<IndexId> snapshotIndices = repositoryData.resolveNewIndices(indices);
newSnapshot = new SnapshotsInProgress.Entry(
new Snapshot(repositoryName, snapshotId),
request.includeGlobalState(), request.partial(),
State.INIT,
snapshotIndices,
threadPool.absoluteTimeInMillis(),
repositoryData.getGenId(),
null,
request.userMetadata(),
hasOldFormatSnapshots == false &&
clusterService.state().nodes().getMinNodeVersion().onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION));
initializingSnapshots.add(newSnapshot.snapshot());
snapshots = new SnapshotsInProgress(newSnapshot);
} else {
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, " a snapshot is already running");
}
return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build();
clusterService.submitStateUpdateTask("create_snapshot [" + snapshotName + ']', new ClusterStateUpdateTask() {

private SnapshotsInProgress.Entry newSnapshot = null;

private List<String> indices;

@Override
public ClusterState execute(ClusterState currentState) {
validate(repositoryName, snapshotName, currentState);
SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName,
"cannot snapshot while a snapshot deletion is in-progress in [" + deletionsInProgress + "]");
}
final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE);
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) {
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName,
"cannot snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]");
}
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
if (snapshots != null && snapshots.entries().isEmpty() == false) {
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, " a snapshot is already running");
}
// Store newSnapshot here to be processed in clusterStateProcessed
indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState,
request.indicesOptions(), request.indices()));
logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices);
newSnapshot = new SnapshotsInProgress.Entry(
new Snapshot(repositoryName, snapshotId),
request.includeGlobalState(), request.partial(),
State.INIT,
Collections.emptyList(), // We'll resolve the list of indices when moving to the STARTED state in #beginSnapshot
threadPool.absoluteTimeInMillis(),
RepositoryData.UNKNOWN_REPO_GEN,
null,
request.userMetadata(), false
);
initializingSnapshots.add(newSnapshot.snapshot());
snapshots = new SnapshotsInProgress(newSnapshot);
return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build();
}

@Override
public void onFailure(String source, Exception e) {
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to create snapshot", repositoryName, snapshotName), e);
if (newSnapshot != null) {
initializingSnapshots.remove(newSnapshot.snapshot());
}
newSnapshot = null;
listener.onFailure(e);
@Override
public void onFailure(String source, Exception e) {
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to create snapshot", repositoryName, snapshotName), e);
if (newSnapshot != null) {
initializingSnapshots.remove(newSnapshot.snapshot());
}
newSnapshot = null;
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, final ClusterState newState) {
if (newSnapshot != null) {
final Snapshot current = newSnapshot.snapshot();
assert initializingSnapshots.contains(current);
beginSnapshot(newState, newSnapshot, request.partial(), new ActionListener<Snapshot>() {
@Override
public void onResponse(final Snapshot snapshot) {
initializingSnapshots.remove(snapshot);
listener.onResponse(snapshot);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, final ClusterState newState) {
if (newSnapshot != null) {
final Snapshot current = newSnapshot.snapshot();
assert initializingSnapshots.contains(current);
assert indices != null;
beginSnapshot(newState, newSnapshot, request.partial(), indices, new ActionListener<Snapshot>() {
@Override
public void onResponse(final Snapshot snapshot) {
initializingSnapshots.remove(snapshot);
listener.onResponse(snapshot);
}

@Override
public void onFailure(final Exception e) {
initializingSnapshots.remove(current);
listener.onFailure(e);
}
});
}
@Override
public void onFailure(final Exception e) {
initializingSnapshots.remove(current);
listener.onFailure(e);
}
});
}
}

@Override
public TimeValue timeout() {
return request.masterNodeTimeout();
}
});
}, listener::onFailure);
@Override
public TimeValue timeout() {
return request.masterNodeTimeout();
}
});
}

public boolean hasOldVersionSnapshots(String repositoryName, RepositoryData repositoryData, @Nullable SnapshotId excluded) {
Expand Down Expand Up @@ -442,6 +437,7 @@ private static void validate(final String repositoryName, final String snapshotN
private void beginSnapshot(final ClusterState clusterState,
final SnapshotsInProgress.Entry snapshot,
final boolean partial,
final List<String> indices,
final ActionListener<Snapshot> userCreateSnapshotListener) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() {

Expand Down Expand Up @@ -474,13 +470,20 @@ protected void doRun() {
snapshot.snapshot().getSnapshotId(), snapshot.indices(),
metaDataForSnapshot(snapshot, clusterState.metaData()));
}

snapshotCreated = true;

logger.info("snapshot [{}] started", snapshot.snapshot());
if (snapshot.indices().isEmpty()) {
final boolean hasOldFormatSnapshots =
hasOldVersionSnapshots(snapshot.snapshot().getRepository(), repositoryData, null);
final boolean writeShardGenerations = hasOldFormatSnapshots == false &&
clusterService.state().nodes().getMinNodeVersion().onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION);
if (indices.isEmpty()) {
// No indices in this snapshot - we are done
userCreateSnapshotListener.onResponse(snapshot.snapshot());
endSnapshot(snapshot, clusterState.metaData());
endSnapshot(new SnapshotsInProgress.Entry(
snapshot, State.STARTED, Collections.emptyList(), repositoryData.getGenId(), null, writeShardGenerations,
null), clusterState.metaData());
return;
}
clusterService.submitStateUpdateTask("update_snapshot [" + snapshot.snapshot() + "]", new ClusterStateUpdateTask() {
Expand All @@ -500,8 +503,10 @@ public ClusterState execute(ClusterState currentState) {
assert entry.shards().isEmpty();
hadAbortedInitializations = true;
} else {
final List<IndexId> indexIds = repositoryData.resolveNewIndices(indices);
// Replace the snapshot that was just initialized
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards = shards(currentState, entry, repositoryData);
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards =
shards(currentState, indexIds, writeShardGenerations, repositoryData);
if (!partial) {
Tuple<Set<String>, Set<String>> indicesWithMissingShards = indicesWithMissingShards(shards,
currentState.metaData());
Expand All @@ -520,12 +525,13 @@ public ClusterState execute(ClusterState currentState) {
failureMessage.append("Indices are closed ");
failureMessage.append(closed);
}
entries.add(
new SnapshotsInProgress.Entry(entry, State.FAILED, shards, failureMessage.toString()));
entries.add(new SnapshotsInProgress.Entry(entry, State.FAILED, indexIds,
repositoryData.getGenId(), shards, writeShardGenerations, failureMessage.toString()));
continue;
}
}
entries.add(new SnapshotsInProgress.Entry(entry, State.STARTED, shards));
entries.add(new SnapshotsInProgress.Entry(entry, State.STARTED, indexIds, repositoryData.getGenId(),
shards, writeShardGenerations, null));
}
}
return ClusterState.builder(currentState)
Expand Down Expand Up @@ -1507,17 +1513,19 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
/**
* Calculates the list of shards that should be included into the current snapshot
*
* @param clusterState cluster state
* @param snapshot SnapshotsInProgress Entry
* @param clusterState cluster state
* @param indices Indices to snapshot
* @param useShardGenerations whether to write {@link ShardGenerations} during the snapshot
* @return list of shard to be included into current snapshot
*/
private static ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards(ClusterState clusterState,
SnapshotsInProgress.Entry snapshot,
List<IndexId> indices,
boolean useShardGenerations,
RepositoryData repositoryData) {
ImmutableOpenMap.Builder<ShardId, SnapshotsInProgress.ShardSnapshotStatus> builder = ImmutableOpenMap.builder();
MetaData metaData = clusterState.metaData();
final ShardGenerations shardGenerations = repositoryData.shardGenerations();
for (IndexId index : snapshot.indices()) {
for (IndexId index : indices) {
final String indexName = index.getName();
final boolean isNewIndex = repositoryData.getIndices().containsKey(indexName) == false;
IndexMetaData indexMetaData = metaData.index(indexName);
Expand All @@ -1530,7 +1538,7 @@ private static ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus
for (int i = 0; i < indexMetaData.getNumberOfShards(); i++) {
ShardId shardId = new ShardId(indexMetaData.getIndex(), i);
final String shardRepoGeneration;
if (snapshot.useShardGenerations()) {
if (useShardGenerations) {
if (isNewIndex) {
assert shardGenerations.getShardGen(index, shardId.getId()) == null
: "Found shard generation for new index [" + index + "]";
Expand Down

0 comments on commit 4e8ab43

Please sign in to comment.