Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Redundant CS Update on Snapshot Finalization #55276

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -239,12 +240,13 @@ class S3Repository extends BlobStoreRepository {
public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, int totalShards,
List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState,
Metadata clusterMetadata, Map<String, Object> userMetadata, Version repositoryMetaVersion,
Function<ClusterState, ClusterState> stateFilter,
ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener) {
if (SnapshotsService.useShardGenerations(repositoryMetaVersion) == false) {
listener = delayedListener(listener);
}
super.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures, repositoryStateId,
includeGlobalState, clusterMetadata, userMetadata, repositoryMetaVersion, listener);
includeGlobalState, clusterMetadata, userMetadata, repositoryMetaVersion, stateFilter, listener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

public class FilterRepository implements Repository {

Expand Down Expand Up @@ -79,9 +80,10 @@ public void getRepositoryData(ActionListener<RepositoryData> listener) {
public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure,
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
boolean includeGlobalState, Metadata metadata, Map<String, Object> userMetadata,
Version repositoryMetaVersion, ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener) {
Version repositoryMetaVersion, Function<ClusterState, ClusterState> stateFilter,
ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener) {
in.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures, repositoryStateId,
includeGlobalState, metadata, userMetadata, repositoryMetaVersion, listener);
includeGlobalState, metadata, userMetadata, repositoryMetaVersion, stateFilter, listener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,16 @@ default Repository create(RepositoryMetadata metadata, Function<String, Reposito
* @param clusterMetadata cluster metadata
* @param userMetadata user metadata
* @param repositoryMetaVersion version of the updated repository metadata to write
* @param stateFilter a function that filters the last cluster state update that the snapshot finalization will execute and
* is used to remove any state tracked for the in-progress snapshot from the cluster state
* @param listener listener to be invoked with the new {@link RepositoryData} and the snapshot's {@link SnapshotInfo}
* completion of the snapshot
*/
void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure,
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
boolean includeGlobalState, Metadata clusterMetadata, Map<String, Object> userMetadata,
Version repositoryMetaVersion, ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener);
Version repositoryMetaVersion, Function<ClusterState, ClusterState> stateFilter,
ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener);

/**
* Deletes snapshot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import java.util.stream.Stream;
Expand Down Expand Up @@ -589,7 +590,7 @@ private void doDeleteShardSnapshots(SnapshotId snapshotId, long repositoryStateI
builder.put(newGen.indexId, newGen.shardId, newGen.newGeneration);
}
final RepositoryData updatedRepoData = repositoryData.removeSnapshot(snapshotId, builder.build());
writeIndexGen(updatedRepoData, repositoryStateId, true,
writeIndexGen(updatedRepoData, repositoryStateId, true, Function.identity(),
ActionListener.wrap(v -> writeUpdatedRepoDataStep.onResponse(updatedRepoData), listener::onFailure));
}, listener::onFailure);
// Once we have updated the repository, run the clean-ups
Expand All @@ -603,7 +604,7 @@ private void doDeleteShardSnapshots(SnapshotId snapshotId, long repositoryStateI
} else {
// Write the new repository data first (with the removed snapshot), using no shard generations
final RepositoryData updatedRepoData = repositoryData.removeSnapshot(snapshotId, ShardGenerations.EMPTY);
writeIndexGen(updatedRepoData, repositoryStateId, false, ActionListener.wrap(v -> {
writeIndexGen(updatedRepoData, repositoryStateId, false, Function.identity(), ActionListener.wrap(v -> {
// Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
final ActionListener<Void> afterCleanupsListener =
new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2);
Expand Down Expand Up @@ -795,7 +796,7 @@ public void cleanup(long repositoryStateId, Version repositoryMetaVersion, Actio
} else {
// write new index-N blob to ensure concurrent operations will fail
writeIndexGen(repositoryData, repositoryStateId, SnapshotsService.useShardGenerations(repositoryMetaVersion),
ActionListener.wrap(v -> cleanupStaleBlobs(foundIndices, rootBlobs, repositoryData,
Function.identity(), ActionListener.wrap(v -> cleanupStaleBlobs(foundIndices, rootBlobs, repositoryData,
ActionListener.map(listener, RepositoryCleanupResult::new)), listener::onFailure));
}
} catch (Exception e) {
Expand Down Expand Up @@ -894,6 +895,7 @@ public void finalizeSnapshot(final SnapshotId snapshotId,
final Metadata clusterMetadata,
final Map<String, Object> userMetadata,
Version repositoryMetaVersion,
Function<ClusterState, ClusterState> stateFilter,
final ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener) {
assert repositoryStateId > RepositoryData.UNKNOWN_REPO_GEN :
"Must finalize based on a valid repository generation but received [" + repositoryStateId + "]";
Expand All @@ -912,12 +914,13 @@ public void finalizeSnapshot(final SnapshotId snapshotId,
getRepositoryData(ActionListener.wrap(existingRepositoryData -> {
final RepositoryData updatedRepositoryData =
existingRepositoryData.addSnapshot(snapshotId, snapshotInfo.state(), Version.CURRENT, shardGenerations);
writeIndexGen(updatedRepositoryData, repositoryStateId, writeShardGens, ActionListener.wrap(writtenRepoData -> {
if (writeShardGens) {
cleanupOldShardGens(existingRepositoryData, updatedRepositoryData);
}
listener.onResponse(new Tuple<>(writtenRepoData, snapshotInfo));
}, onUpdateFailure));
writeIndexGen(updatedRepositoryData, repositoryStateId, writeShardGens, stateFilter,
ActionListener.wrap(writtenRepoData -> {
if (writeShardGens) {
cleanupOldShardGens(existingRepositoryData, updatedRepositoryData);
}
listener.onResponse(new Tuple<>(writtenRepoData, snapshotInfo));
}, onUpdateFailure));
}, onUpdateFailure));
}, onUpdateFailure), 2 + indices.size());
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
Expand Down Expand Up @@ -1301,10 +1304,11 @@ public boolean isReadOnly() {
* @param repositoryData RepositoryData to write
* @param expectedGen expected repository generation at the start of the operation
* @param writeShardGens whether to write {@link ShardGenerations} to the new {@link RepositoryData} blob
* @param stateFilter filter for the last cluster state update executed by this method
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a filter, rather a transformation function. Perhaps adapt naming?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed :)

* @param listener completion listener
*/
protected void writeIndexGen(RepositoryData repositoryData, long expectedGen, boolean writeShardGens,
ActionListener<RepositoryData> listener) {
Function<ClusterState, ClusterState> stateFilter, ActionListener<RepositoryData> listener) {
assert isReadOnly() == false; // can not write to a read only repository
final long currentGen = repositoryData.getGenId();
if (currentGen != expectedGen) {
Expand Down Expand Up @@ -1437,10 +1441,10 @@ public ClusterState execute(ClusterState currentState) {
"Tried to update from unexpected pending repo generation [" + meta.pendingGeneration() +
"] after write to generation [" + newGen + "]");
}
return ClusterState.builder(currentState).metadata(Metadata.builder(currentState.getMetadata())
return stateFilter.apply(ClusterState.builder(currentState).metadata(Metadata.builder(currentState.getMetadata())
.putCustom(RepositoriesMetadata.TYPE,
currentState.metadata().<RepositoriesMetadata>custom(RepositoriesMetadata.TYPE).withUpdatedGeneration(
metadata.name(), newGen, newGen)).build()).build();
metadata.name(), newGen, newGen))).build());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ public ClusterState execute(ClusterState currentState) {
public void onFailure(String source, Exception e) {
logger.warn(() -> new ParameterizedMessage("[{}] failed to create snapshot",
snapshot.snapshot().getSnapshotId()), e);
removeSnapshotFromClusterState(snapshot.snapshot(), null, e,
removeSnapshotFromClusterState(snapshot.snapshot(), e,
new CleanupAfterErrorListener(userCreateSnapshotListener, e));
}

Expand Down Expand Up @@ -432,7 +432,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
public void onFailure(Exception e) {
logger.warn(() -> new ParameterizedMessage("failed to create snapshot [{}]",
snapshot.snapshot().getSnapshotId()), e);
removeSnapshotFromClusterState(snapshot.snapshot(), null, e,
removeSnapshotFromClusterState(snapshot.snapshot(), e,
new CleanupAfterErrorListener(userCreateSnapshotListener, e));
}
});
Expand Down Expand Up @@ -826,8 +826,7 @@ private void endSnapshot(SnapshotsInProgress.Entry entry, Metadata metadata) {
final Snapshot snapshot = entry.snapshot();
if (entry.repositoryStateId() == RepositoryData.UNKNOWN_REPO_GEN) {
logger.debug("[{}] was aborted before starting", snapshot);
removeSnapshotFromClusterState(entry.snapshot(), null,
new SnapshotException(snapshot, "Aborted on initialization"));
removeSnapshotFromClusterState(entry.snapshot(), new SnapshotException(snapshot, "Aborted on initialization"), null);
return;
}
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() {
Expand Down Expand Up @@ -862,11 +861,20 @@ protected void doRun() {
metadataForSnapshot(entry, metadata),
entry.userMetadata(),
entry.version(),
ActionListener.wrap(result -> {
final SnapshotInfo snapshotInfo = result.v2();
removeSnapshotFromClusterState(snapshot, result, null);
logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state());
}, this::onFailure));
state -> stateWithoutSnapshot(state, snapshot),
ActionListener.wrap(result -> {
final List<ActionListener<Tuple<RepositoryData, SnapshotInfo>>> completionListeners =
snapshotCompletionListeners.remove(snapshot);
if (completionListeners != null) {
try {
ActionListener.onResponse(completionListeners, result);
} catch (Exception e) {
logger.warn("Failed to notify listeners", e);
}
}
endingSnapshots.remove(snapshot);
logger.info("snapshot [{}] completed with state [{}]", snapshot, result.v2().state());
}, this::onFailure));
}

@Override
Expand All @@ -881,53 +889,46 @@ public void onFailure(final Exception e) {
new SnapshotException(snapshot, "Failed to update cluster state during snapshot finalization", e));
} else {
logger.warn(() -> new ParameterizedMessage("[{}] failed to finalize snapshot", snapshot), e);
removeSnapshotFromClusterState(snapshot, null, e);
removeSnapshotFromClusterState(snapshot, e, null);
}
}
});
}

/**
* Removes record of running snapshot from cluster state
* @param snapshot snapshot
* @param snapshotResult new {@link RepositoryData} and {@link SnapshotInfo} info if snapshot was successful
* @param e exception if snapshot failed, {@code null} otherwise
*/
private void removeSnapshotFromClusterState(final Snapshot snapshot, @Nullable Tuple<RepositoryData, SnapshotInfo> snapshotResult,
@Nullable Exception e) {
removeSnapshotFromClusterState(snapshot, snapshotResult, e, null);
private static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot snapshot) {
SnapshotsInProgress snapshots = state.custom(SnapshotsInProgress.TYPE);
if (snapshots != null) {
boolean changed = false;
ArrayList<SnapshotsInProgress.Entry> entries = new ArrayList<>();
for (SnapshotsInProgress.Entry entry : snapshots.entries()) {
if (entry.snapshot().equals(snapshot)) {
changed = true;
} else {
entries.add(entry);
}
}
if (changed) {
return ClusterState.builder(state).putCustom(
SnapshotsInProgress.TYPE, new SnapshotsInProgress(unmodifiableList(entries))).build();
}
}
return state;
}

/**
* Removes record of running snapshot from cluster state and notifies the listener when this action is complete
* @param snapshot snapshot
* @param failure exception if snapshot failed, {@code null} otherwise
* @param failure exception if snapshot failed
* @param listener listener to notify when snapshot information is removed from the cluster state
*/
private void removeSnapshotFromClusterState(final Snapshot snapshot, @Nullable Tuple<RepositoryData, SnapshotInfo> snapshotResult,
@Nullable Exception failure, @Nullable CleanupAfterErrorListener listener) {
assert snapshotResult != null || failure != null : "Either snapshotInfo or failure must be supplied";
private void removeSnapshotFromClusterState(final Snapshot snapshot, Exception failure,
@Nullable CleanupAfterErrorListener listener) {
assert failure != null : "Failure must be supplied";
clusterService.submitStateUpdateTask("remove snapshot metadata", new ClusterStateUpdateTask() {

@Override
public ClusterState execute(ClusterState currentState) {
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
if (snapshots != null) {
boolean changed = false;
ArrayList<SnapshotsInProgress.Entry> entries = new ArrayList<>();
for (SnapshotsInProgress.Entry entry : snapshots.entries()) {
if (entry.snapshot().equals(snapshot)) {
changed = true;
} else {
entries.add(entry);
}
}
if (changed) {
return ClusterState.builder(currentState)
.putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(unmodifiableList(entries))).build();
}
}
return currentState;
return stateWithoutSnapshot(currentState, snapshot);
}

@Override
Expand All @@ -951,20 +952,7 @@ public void onNoLongerMaster(String source) {

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (snapshotResult == null) {
failSnapshotCompletionListeners(snapshot, failure);
} else {
final List<ActionListener<Tuple<RepositoryData, SnapshotInfo>>> completionListeners =
snapshotCompletionListeners.remove(snapshot);
if (completionListeners != null) {
try {
ActionListener.onResponse(completionListeners, snapshotResult);
} catch (Exception e) {
logger.warn("Failed to notify listeners", e);
}
}
endingSnapshots.remove(snapshot);
}
failSnapshotCompletionListeners(snapshot, failure);
if (listener != null) {
listener.onFailure(null);
}
Expand Down
Loading