Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify Snapshot Initialization #51256

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,12 @@ public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, Sta
useShardGenerations);
}

public Entry(Entry entry, State state, List<IndexId> indices, long repositoryStateId,
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards, boolean useShardGenerations, String failure) {
this(entry.snapshot, entry.includeGlobalState, entry.partial, state, indices, entry.startTime, repositoryStateId, shards,
failure, entry.userMetadata, useShardGenerations);
}

public Entry(Entry entry, State state, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime,
entry.repositoryStateId, shards, entry.failure, entry.userMetadata, entry.useShardGenerations);
Expand Down
186 changes: 97 additions & 89 deletions server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@
* <li>On the master node the {@link #createSnapshot(CreateSnapshotRequest, ActionListener)} is called and makes sure that
* no snapshot is currently running and registers the new snapshot in cluster state</li>
* <li>When cluster state is updated
* the {@link #beginSnapshot(ClusterState, SnapshotsInProgress.Entry, boolean, ActionListener)} method kicks in and initializes
* the {@link #beginSnapshot} method kicks in and initializes
* the snapshot in the repository and then populates list of shards that needs to be snapshotted in cluster state</li>
* <li>Each data node is watching for these shards and when new shards scheduled for snapshotting appear in the cluster state, data nodes
* start processing them through {@link SnapshotShardsService#startNewSnapshots} method</li>
Expand Down Expand Up @@ -268,90 +268,85 @@ public void createSnapshot(final CreateSnapshotRequest request, final ActionList
final String snapshotName = indexNameExpressionResolver.resolveDateMathExpression(request.snapshot());
validate(repositoryName, snapshotName);
final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); // new UUID for the snapshot
final StepListener<RepositoryData> repositoryDataListener = new StepListener<>();
repositoriesService.repository(repositoryName).getRepositoryData(repositoryDataListener);
repositoryDataListener.whenComplete(repositoryData -> {
final boolean hasOldFormatSnapshots = hasOldVersionSnapshots(repositoryName, repositoryData, null);
clusterService.submitStateUpdateTask("create_snapshot [" + snapshotName + ']', new ClusterStateUpdateTask() {

private SnapshotsInProgress.Entry newSnapshot = null;

@Override
public ClusterState execute(ClusterState currentState) {
validate(repositoryName, snapshotName, currentState);
SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName,
"cannot snapshot while a snapshot deletion is in-progress in [" + deletionsInProgress + "]");
}
final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE);
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) {
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName,
"cannot snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]");
}
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
if (snapshots == null || snapshots.entries().isEmpty()) {
// Store newSnapshot here to be processed in clusterStateProcessed
List<String> indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState,
request.indicesOptions(), request.indices()));
logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices);
List<IndexId> snapshotIndices = repositoryData.resolveNewIndices(indices);
newSnapshot = new SnapshotsInProgress.Entry(
new Snapshot(repositoryName, snapshotId),
request.includeGlobalState(), request.partial(),
State.INIT,
snapshotIndices,
threadPool.absoluteTimeInMillis(),
repositoryData.getGenId(),
null,
request.userMetadata(),
hasOldFormatSnapshots == false &&
clusterService.state().nodes().getMinNodeVersion().onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION));
initializingSnapshots.add(newSnapshot.snapshot());
snapshots = new SnapshotsInProgress(newSnapshot);
} else {
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, " a snapshot is already running");
}
return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build();
clusterService.submitStateUpdateTask("create_snapshot [" + snapshotName + ']', new ClusterStateUpdateTask() {

private SnapshotsInProgress.Entry newSnapshot = null;

private List<String> indices;

@Override
public ClusterState execute(ClusterState currentState) {
validate(repositoryName, snapshotName, currentState);
SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName,
"cannot snapshot while a snapshot deletion is in-progress in [" + deletionsInProgress + "]");
}
final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE);
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) {
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName,
"cannot snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]");
}
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
if (snapshots != null && snapshots.entries().isEmpty() == false) {
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, " a snapshot is already running");
}
// Store newSnapshot here to be processed in clusterStateProcessed
indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState,
request.indicesOptions(), request.indices()));
logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices);
newSnapshot = new SnapshotsInProgress.Entry(
new Snapshot(repositoryName, snapshotId),
request.includeGlobalState(), request.partial(),
State.INIT,
Collections.emptyList(), // We'll resolve the list of indices when moving to the STARTED state in #beginSnapshot
threadPool.absoluteTimeInMillis(),
RepositoryData.UNKNOWN_REPO_GEN,
null,
request.userMetadata(), false
);
initializingSnapshots.add(newSnapshot.snapshot());
snapshots = new SnapshotsInProgress(newSnapshot);
return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build();
}

@Override
public void onFailure(String source, Exception e) {
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to create snapshot", repositoryName, snapshotName), e);
if (newSnapshot != null) {
initializingSnapshots.remove(newSnapshot.snapshot());
}
newSnapshot = null;
listener.onFailure(e);
@Override
public void onFailure(String source, Exception e) {
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to create snapshot", repositoryName, snapshotName), e);
if (newSnapshot != null) {
initializingSnapshots.remove(newSnapshot.snapshot());
}
newSnapshot = null;
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, final ClusterState newState) {
if (newSnapshot != null) {
final Snapshot current = newSnapshot.snapshot();
assert initializingSnapshots.contains(current);
beginSnapshot(newState, newSnapshot, request.partial(), new ActionListener<>() {
@Override
public void onResponse(final Snapshot snapshot) {
initializingSnapshots.remove(snapshot);
listener.onResponse(snapshot);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, final ClusterState newState) {
if (newSnapshot != null) {
final Snapshot current = newSnapshot.snapshot();
assert initializingSnapshots.contains(current);
assert indices != null;
beginSnapshot(newState, newSnapshot, request.partial(), indices, new ActionListener<>() {
@Override
public void onResponse(final Snapshot snapshot) {
initializingSnapshots.remove(snapshot);
listener.onResponse(snapshot);
}

@Override
public void onFailure(final Exception e) {
initializingSnapshots.remove(current);
listener.onFailure(e);
}
});
}
@Override
public void onFailure(final Exception e) {
initializingSnapshots.remove(current);
listener.onFailure(e);
}
});
}
}

@Override
public TimeValue timeout() {
return request.masterNodeTimeout();
}
});
}, listener::onFailure);
@Override
public TimeValue timeout() {
return request.masterNodeTimeout();
}
});
}

public boolean hasOldVersionSnapshots(String repositoryName, RepositoryData repositoryData, @Nullable SnapshotId excluded) {
Expand Down Expand Up @@ -436,6 +431,7 @@ private static void validate(final String repositoryName, final String snapshotN
private void beginSnapshot(final ClusterState clusterState,
final SnapshotsInProgress.Entry snapshot,
final boolean partial,
final List<String> indices,
final ActionListener<Snapshot> userCreateSnapshotListener) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() {

Expand All @@ -460,13 +456,20 @@ protected void doRun() {
throw new InvalidSnapshotNameException(
repository.getMetadata().name(), snapshotName, "snapshot with the same name already exists");
}

snapshotCreated = true;

logger.info("snapshot [{}] started", snapshot.snapshot());
if (snapshot.indices().isEmpty()) {
final boolean hasOldFormatSnapshots =
hasOldVersionSnapshots(snapshot.snapshot().getRepository(), repositoryData, null);
final boolean writeShardGenerations = hasOldFormatSnapshots == false &&
clusterService.state().nodes().getMinNodeVersion().onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION);
if (indices.isEmpty()) {
// No indices in this snapshot - we are done
userCreateSnapshotListener.onResponse(snapshot.snapshot());
endSnapshot(snapshot, clusterState.metaData());
endSnapshot(new SnapshotsInProgress.Entry(
snapshot, State.STARTED, Collections.emptyList(), repositoryData.getGenId(), null, writeShardGenerations,
null), clusterState.metaData());
return;
}
clusterService.submitStateUpdateTask("update_snapshot [" + snapshot.snapshot() + "]", new ClusterStateUpdateTask() {
Expand All @@ -486,8 +489,10 @@ public ClusterState execute(ClusterState currentState) {
assert entry.shards().isEmpty();
hadAbortedInitializations = true;
} else {
final List<IndexId> indexIds = repositoryData.resolveNewIndices(indices);
// Replace the snapshot that was just initialized
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards = shards(currentState, entry, repositoryData);
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards =
shards(currentState, indexIds, writeShardGenerations, repositoryData);
if (!partial) {
Tuple<Set<String>, Set<String>> indicesWithMissingShards = indicesWithMissingShards(shards,
currentState.metaData());
Expand All @@ -506,12 +511,13 @@ public ClusterState execute(ClusterState currentState) {
failureMessage.append("Indices are closed ");
failureMessage.append(closed);
}
entries.add(
new SnapshotsInProgress.Entry(entry, State.FAILED, shards, failureMessage.toString()));
entries.add(new SnapshotsInProgress.Entry(entry, State.FAILED, indexIds,
repositoryData.getGenId(), shards, writeShardGenerations, failureMessage.toString()));
continue;
}
}
entries.add(new SnapshotsInProgress.Entry(entry, State.STARTED, shards));
entries.add(new SnapshotsInProgress.Entry(entry, State.STARTED, indexIds, repositoryData.getGenId(),
shards, writeShardGenerations, null));
}
}
return ClusterState.builder(currentState)
Expand Down Expand Up @@ -1493,17 +1499,19 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
/**
* Calculates the list of shards that should be included into the current snapshot
*
* @param clusterState cluster state
* @param snapshot SnapshotsInProgress Entry
* @param clusterState cluster state
* @param indices Indices to snapshot
* @param useShardGenerations whether to write {@link ShardGenerations} during the snapshot
* @return list of shard to be included into current snapshot
*/
private static ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards(ClusterState clusterState,
SnapshotsInProgress.Entry snapshot,
List<IndexId> indices,
boolean useShardGenerations,
RepositoryData repositoryData) {
ImmutableOpenMap.Builder<ShardId, SnapshotsInProgress.ShardSnapshotStatus> builder = ImmutableOpenMap.builder();
MetaData metaData = clusterState.metaData();
final ShardGenerations shardGenerations = repositoryData.shardGenerations();
for (IndexId index : snapshot.indices()) {
for (IndexId index : indices) {
final String indexName = index.getName();
final boolean isNewIndex = repositoryData.getIndices().containsKey(indexName) == false;
IndexMetaData indexMetaData = metaData.index(indexName);
Expand All @@ -1516,7 +1524,7 @@ private static ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus
for (int i = 0; i < indexMetaData.getNumberOfShards(); i++) {
ShardId shardId = new ShardId(indexMetaData.getIndex(), i);
final String shardRepoGeneration;
if (snapshot.useShardGenerations()) {
if (useShardGenerations) {
if (isNewIndex) {
assert shardGenerations.getShardGen(index, shardId.getId()) == null
: "Found shard generation for new index [" + index + "]";
Expand Down