Skip to content

Commit

Permalink
Remove Redundant CS Update during Snapshot Delete
Browse files Browse the repository at this point in the history
Same as elastic#55276 but for snapshot deletes.
This change folds the removal of the snapshot delete state entry into
the the safe generation step where possible.
This measn that for repositories that write shard generations, the time
the snapshot delte entry will stay in the cluster state will be shortened a lot
and reduced to the time it takes to update the repository metadata.
It is fully safe in this case to run other snapshot operations after the metadata.

We can not do this for repositories that do not write shard generations so those need to
go through a different path and submit a separate state update task still.

Also, this PR fixes a problem with the cooldown period for S3 non-shard-generation repos
introduced by elastic#55286. We can not run the state update outright in the repository because
we enforce the cooldown via the listener wrapping. I fixed this by folding the final
state update into the listener in this case.
  • Loading branch information
original-brownbear committed Apr 21, 2020
1 parent 30d8e1f commit 960f861
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -243,26 +244,30 @@ public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenera
Function<ClusterState, ClusterState> stateTransformer,
ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener) {
if (SnapshotsService.useShardGenerations(repositoryMetaVersion) == false) {
listener = delayedListener(listener);
listener = delayedListener(listener, stateTransformer);
// We're delaying the state update on purpose so we added it to the listener and will just pass a dummy to the repository
stateTransformer = Function.identity();
}
super.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures, repositoryStateId,
includeGlobalState, clusterMetadata, userMetadata, repositoryMetaVersion, stateTransformer, listener);
}

@Override
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Version repositoryMetaVersion,
ActionListener<Void> listener) {
Function<ClusterState, ClusterState> stateTransformer, ActionListener<Void> listener) {
if (SnapshotsService.useShardGenerations(repositoryMetaVersion) == false) {
listener = delayedListener(listener);
listener = delayedListener(listener, stateTransformer);
// We're delaying the state update on purpose so we added it to the listener and will just pass a dummy to the repository
stateTransformer = Function.identity();
}
super.deleteSnapshot(snapshotId, repositoryStateId, repositoryMetaVersion, listener);
super.deleteSnapshot(snapshotId, repositoryStateId, repositoryMetaVersion, stateTransformer, listener);
}

/**
* Wraps given listener such that it is executed with a delay of {@link #coolDown} on the snapshot thread-pool after being invoked.
* See {@link #COOLDOWN_PERIOD} for details.
*/
private <T> ActionListener<T> delayedListener(ActionListener<T> listener) {
private <T> ActionListener<T> delayedListener(ActionListener<T> listener, Function<ClusterState, ClusterState> stateTransformer) {
final ActionListener<T> wrappedListener = ActionListener.runBefore(listener, () -> {
final Scheduler.Cancellable cancellable = finalizationFuture.getAndSet(null);
assert cancellable != null;
Expand All @@ -272,8 +277,23 @@ private <T> ActionListener<T> delayedListener(ActionListener<T> listener) {
public void onResponse(T response) {
logCooldownInfo();
final Scheduler.Cancellable existing = finalizationFuture.getAndSet(
threadPool.schedule(ActionRunnable.wrap(wrappedListener, l -> l.onResponse(response)),
coolDown, ThreadPool.Names.SNAPSHOT));
threadPool.schedule(ActionRunnable.wrap(wrappedListener, l -> clusterService.submitStateUpdateTask(
"Delayed s3 repository finalization", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return stateTransformer.apply(currentState);
}

@Override
public void onFailure(String source, Exception e) {
l.onFailure(e);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
l.onResponse(response);
}
})), coolDown, ThreadPool.Names.SNAPSHOT));
assert existing == null : "Already have an ongoing finalization " + finalizationFuture;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenera

@Override
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Version repositoryMetaVersion,
ActionListener<Void> listener) {
in.deleteSnapshot(snapshotId, repositoryStateId, repositoryMetaVersion, listener);
Function<ClusterState, ClusterState> stateTransformer, ActionListener<Void> listener) {
in.deleteSnapshot(snapshotId, repositoryStateId, repositoryMetaVersion, stateTransformer, listener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,12 @@ void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations,
* @param snapshotId snapshot id
* @param repositoryStateId the unique id identifying the state of the repository when the snapshot deletion began
* @param repositoryMetaVersion version of the updated repository metadata to write
* @param stateTransformer a function that filters the last cluster state update that the snapshot delete will execute and
* is used to remove any state tracked for the in-progress snapshot from the cluster state
* @param listener completion listener
*/
void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Version repositoryMetaVersion, ActionListener<Void> listener);
void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Version repositoryMetaVersion,
Function<ClusterState, ClusterState> stateTransformer, ActionListener<Void> listener);

/**
* Returns snapshot throttle time in nanoseconds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp

private final BlobPath basePath;

private final ClusterService clusterService;
protected final ClusterService clusterService;

/**
* Flag that is set to {@code true} if this instance is started with {@link #metadata} that has a higher value for
Expand Down Expand Up @@ -495,7 +495,7 @@ public RepositoryStats stats() {

@Override
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Version repositoryMetaVersion,
ActionListener<Void> listener) {
Function<ClusterState, ClusterState> stateTransformer, ActionListener<Void> listener) {
if (isReadOnly()) {
listener.onFailure(new RepositoryException(metadata.name(), "cannot delete snapshot from a readonly repository"));
} else {
Expand All @@ -513,7 +513,7 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Versio
// delete an index that was created by another master node after writing this index-N blob.
final Map<String, BlobContainer> foundIndices = blobStore().blobContainer(indicesPath()).children();
doDeleteShardSnapshots(snapshotId, repositoryStateId, foundIndices, rootBlobs, repositoryData,
SnapshotsService.useShardGenerations(repositoryMetaVersion), listener);
SnapshotsService.useShardGenerations(repositoryMetaVersion), stateTransformer, listener);
} catch (Exception ex) {
listener.onFailure(new RepositoryException(metadata.name(), "failed to delete snapshot [" + snapshotId + "]", ex));
}
Expand Down Expand Up @@ -566,11 +566,12 @@ private RepositoryData safeRepositoryData(long repositoryStateId, Map<String, Bl
* @param rootBlobs All blobs found at the root of the repository before executing any writes to the repository during this
* delete operation
* @param repositoryData RepositoryData found the in the repository before executing this delete
* @param stateTransformer cluster state transformation to apply after the delete as document at {@link Repository#deleteSnapshot}
* @param listener Listener to invoke once finished
*/
private void doDeleteShardSnapshots(SnapshotId snapshotId, long repositoryStateId, Map<String, BlobContainer> foundIndices,
Map<String, BlobMetadata> rootBlobs, RepositoryData repositoryData, boolean writeShardGens,
ActionListener<Void> listener) {
Function<ClusterState, ClusterState> stateTransformer, ActionListener<Void> listener) {

if (writeShardGens) {
// First write the new shard state metadata (with the removed snapshot) and compute deletion targets
Expand All @@ -590,7 +591,7 @@ private void doDeleteShardSnapshots(SnapshotId snapshotId, long repositoryStateI
builder.put(newGen.indexId, newGen.shardId, newGen.newGeneration);
}
final RepositoryData updatedRepoData = repositoryData.removeSnapshot(snapshotId, builder.build());
writeIndexGen(updatedRepoData, repositoryStateId, true, Function.identity(),
writeIndexGen(updatedRepoData, repositoryStateId, true, stateTransformer,
ActionListener.wrap(v -> writeUpdatedRepoDataStep.onResponse(updatedRepoData), listener::onFailure));
}, listener::onFailure);
// Once we have updated the repository, run the clean-ups
Expand All @@ -605,9 +606,26 @@ private void doDeleteShardSnapshots(SnapshotId snapshotId, long repositoryStateI
// Write the new repository data first (with the removed snapshot), using no shard generations
final RepositoryData updatedRepoData = repositoryData.removeSnapshot(snapshotId, ShardGenerations.EMPTY);
writeIndexGen(updatedRepoData, repositoryStateId, false, Function.identity(), ActionListener.wrap(v -> {
// Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
final ActionListener<Void> afterCleanupsListener =
new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2);
// Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion and then apply the state transformer to the
// cluster state. When not using shard-generations, we must do this after doing all the shard folder updates because
// the repository is not safe for concurrent writes until all shard folders have been updated
final ActionListener<Void> afterCleanupsListener = new GroupedActionListener<>(ActionListener.wrap(() ->
clusterService.submitStateUpdateTask("remove snapshot deletion metadata", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return stateTransformer.apply(currentState);
}

@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(null);
}
})), 2);
asyncCleanupUnlinkedRootAndIndicesBlobs(foundIndices, rootBlobs, updatedRepoData, afterCleanupsListener);
final StepListener<Collection<ShardSnapshotMetaDeleteResult>> writeMetaAndComputeDeletesStep = new StepListener<>();
writeUpdatedShardMetadataAndComputeDeletes(snapshotId, repositoryData, false, writeMetaAndComputeDeletesStep);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1263,9 +1263,12 @@ private void deleteSnapshotFromRepository(Snapshot snapshot, @Nullable ActionLis
repository.getRepositoryData(ActionListener.wrap(repositoryData -> repository.deleteSnapshot(snapshot.getSnapshotId(),
repositoryStateId,
minCompatibleVersion(minNodeVersion, snapshot.getRepository(), repositoryData, snapshot.getSnapshotId()),
SnapshotsService::stateWithoutDelete,
ActionListener.wrap(v -> {
logger.info("snapshot [{}] deleted", snapshot);
removeSnapshotDeletionFromClusterState(snapshot, null, l);
logger.info("Successfully deleted snapshot [{}]", snapshot);
if (l != null) {
l.onResponse(null);
}
}, ex -> removeSnapshotDeletionFromClusterState(snapshot, ex, l)
)), ex -> removeSnapshotDeletionFromClusterState(snapshot, ex, l)));
}));
Expand All @@ -1274,25 +1277,13 @@ private void deleteSnapshotFromRepository(Snapshot snapshot, @Nullable ActionLis
/**
* Removes the snapshot deletion from {@link SnapshotDeletionsInProgress} in the cluster state.
*/
private void removeSnapshotDeletionFromClusterState(final Snapshot snapshot, @Nullable final Exception failure,
private void removeSnapshotDeletionFromClusterState(final Snapshot snapshot, final Exception failure,
@Nullable final ActionListener<Void> listener) {
assert failure != null;
clusterService.submitStateUpdateTask("remove snapshot deletion metadata", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
SnapshotDeletionsInProgress deletions = currentState.custom(SnapshotDeletionsInProgress.TYPE);
if (deletions != null) {
boolean changed = false;
if (deletions.hasDeletionsInProgress()) {
assert deletions.getEntries().size() == 1 : "should have exactly one deletion in progress";
SnapshotDeletionsInProgress.Entry entry = deletions.getEntries().get(0);
deletions = deletions.withRemovedEntry(entry);
changed = true;
}
if (changed) {
return ClusterState.builder(currentState).putCustom(SnapshotDeletionsInProgress.TYPE, deletions).build();
}
}
return currentState;
return stateWithoutDelete(currentState);
}

@Override
Expand All @@ -1306,17 +1297,29 @@ public void onFailure(String source, Exception e) {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (listener != null) {
if (failure != null) {
listener.onFailure(failure);
} else {
logger.info("Successfully deleted snapshot [{}]", snapshot);
listener.onResponse(null);
}
listener.onFailure(failure);
}
}
});
}

private static ClusterState stateWithoutDelete(ClusterState currentState) {
SnapshotDeletionsInProgress deletions = currentState.custom(SnapshotDeletionsInProgress.TYPE);
if (deletions != null) {
boolean changed = false;
if (deletions.hasDeletionsInProgress()) {
assert deletions.getEntries().size() == 1 : "should have exactly one deletion in progress";
SnapshotDeletionsInProgress.Entry entry = deletions.getEntries().get(0);
deletions = deletions.withRemovedEntry(entry);
changed = true;
}
if (changed) {
return ClusterState.builder(currentState).putCustom(SnapshotDeletionsInProgress.TYPE, deletions).build();
}
}
return currentState;
}

/**
* Calculates the list of shards that should be included into the current snapshot
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations indices, lo

@Override
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Version repositoryMetaVersion,
ActionListener<Void> listener) {
Function<ClusterState, ClusterState> stateTransformer, ActionListener<Void> listener) {
listener.onResponse(null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenera

@Override
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Version repositoryMetaVersion,
ActionListener<Void> listener) {
Function<ClusterState, ClusterState> stateTransformer, ActionListener<Void> listener) {
listener.onResponse(null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenera

@Override
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Version repositoryMetaVersion,
ActionListener<Void> listener) {
Function<ClusterState, ClusterState> stateTransformer, ActionListener<Void> listener) {
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
}

Expand Down

0 comments on commit 960f861

Please sign in to comment.