Skip to content

Commit

Permalink
Refactor Snapshot Finalization Method (#76005) (#76594)
Browse files Browse the repository at this point in the history
This refactors the signature of snapshot finalization. For one it allows removing
a TODO about being dependent on mutable `SnapshotInfo` which was not great but
more importantly this sets up a follow-up where state can be shared between the
cluster state update at the end of finalization and subsequent old-shard-generation
cleanup so that we can resolve another open TODO about leaking shard generation files
in some cases.
  • Loading branch information
original-brownbear authored Aug 17, 2021
1 parent 1b7542d commit 1127473
Show file tree
Hide file tree
Showing 16 changed files with 233 additions and 207 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
Expand All @@ -32,12 +30,11 @@
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.repositories.FinalizeSnapshotContext;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.ShardGenerations;
import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -276,15 +273,18 @@ private static Map<String, String> buildLocation(RepositoryMetadata metadata) {
private final AtomicReference<Scheduler.Cancellable> finalizationFuture = new AtomicReference<>();

@Override
public void finalizeSnapshot(ShardGenerations shardGenerations, long repositoryStateId, Metadata clusterMetadata,
SnapshotInfo snapshotInfo, Version repositoryMetaVersion,
Function<ClusterState, ClusterState> stateTransformer,
ActionListener<RepositoryData> listener) {
if (SnapshotsService.useShardGenerations(repositoryMetaVersion) == false) {
listener = delayedListener(listener);
public void finalizeSnapshot(FinalizeSnapshotContext finalizeSnapshotContext) {
if (SnapshotsService.useShardGenerations(finalizeSnapshotContext.repositoryMetaVersion()) == false) {
finalizeSnapshotContext = new FinalizeSnapshotContext(
finalizeSnapshotContext.updatedShardGenerations(),
finalizeSnapshotContext.repositoryStateId(),
finalizeSnapshotContext.clusterMetadata(),
finalizeSnapshotContext.snapshotInfo(),
finalizeSnapshotContext.repositoryMetaVersion(),
delayedListener(finalizeSnapshotContext)
);
}
super.finalizeSnapshot(shardGenerations, repositoryStateId, clusterMetadata, snapshotInfo, repositoryMetaVersion,
stateTransformer, listener);
super.finalizeSnapshot(finalizeSnapshotContext);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@
*/
package org.elasticsearch.snapshots;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
Expand All @@ -19,17 +15,15 @@
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.FinalizeSnapshotContext;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.ShardGenerations;
import org.elasticsearch.repositories.SnapshotShardContext;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.test.ESIntegTestCase;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.function.Function;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -89,24 +83,8 @@ public Map<String, Repository.Factory> getRepositories(
private final String initialMetaValue = metadata.settings().get(MASTER_SETTING_VALUE);

@Override
public void finalizeSnapshot(
ShardGenerations shardGenerations,
long repositoryStateId,
Metadata clusterMetadata,
SnapshotInfo snapshotInfo,
Version repositoryMetaVersion,
Function<ClusterState, ClusterState> stateTransformer,
ActionListener<RepositoryData> listener
) {
super.finalizeSnapshot(
shardGenerations,
repositoryStateId,
clusterMetadata,
snapshotInfo,
repositoryMetaVersion,
stateTransformer,
listener
);
public void finalizeSnapshot(FinalizeSnapshotContext finalizeSnapshotContext) {
super.finalizeSnapshot(finalizeSnapshotContext);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;

import java.io.IOException;
import java.util.Collection;
Expand Down Expand Up @@ -75,24 +74,8 @@ public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, Met
}

@Override
public void finalizeSnapshot(
ShardGenerations shardGenerations,
long repositoryStateId,
Metadata clusterMetadata,
SnapshotInfo snapshotInfo,
Version repositoryMetaVersion,
Function<ClusterState, ClusterState> stateTransformer,
ActionListener<RepositoryData> listener
) {
in.finalizeSnapshot(
shardGenerations,
repositoryStateId,
clusterMetadata,
snapshotInfo,
repositoryMetaVersion,
stateTransformer,
listener
);
public void finalizeSnapshot(final FinalizeSnapshotContext finalizeSnapshotContext) {
in.finalizeSnapshot(finalizeSnapshotContext);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.repositories;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotsService;

/**
* Context for finalizing a snapshot.
*/
public final class FinalizeSnapshotContext extends ActionListener.Delegating<
Tuple<RepositoryData, SnapshotInfo>,
Tuple<RepositoryData, SnapshotInfo>> {

private final ShardGenerations updatedShardGenerations;

private final long repositoryStateId;

private final Metadata clusterMetadata;

private final SnapshotInfo snapshotInfo;

private final Version repositoryMetaVersion;

/**
* @param updatedShardGenerations updated shard generations
* @param repositoryStateId the unique id identifying the state of the repository when the snapshot began
* @param clusterMetadata cluster metadata
* @param snapshotInfo SnapshotInfo instance to write for this snapshot
* @param repositoryMetaVersion version of the updated repository metadata to write
* @param listener listener to be invoked with the new {@link RepositoryData} and {@link SnapshotInfo} after completing
* the snapshot
*/
public FinalizeSnapshotContext(
ShardGenerations updatedShardGenerations,
long repositoryStateId,
Metadata clusterMetadata,
SnapshotInfo snapshotInfo,
Version repositoryMetaVersion,
ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener
) {
super(listener);
this.updatedShardGenerations = updatedShardGenerations;
this.repositoryStateId = repositoryStateId;
this.clusterMetadata = clusterMetadata;
this.snapshotInfo = snapshotInfo;
this.repositoryMetaVersion = repositoryMetaVersion;
}

public long repositoryStateId() {
return repositoryStateId;
}

public ShardGenerations updatedShardGenerations() {
return updatedShardGenerations;
}

public SnapshotInfo snapshotInfo() {
return snapshotInfo;
}

public Version repositoryMetaVersion() {
return repositoryMetaVersion;
}

public Metadata clusterMetadata() {
return clusterMetadata;
}

public ClusterState updatedClusterState(ClusterState state) {
return SnapshotsService.stateWithoutSuccessfulSnapshot(state, snapshotInfo.snapshot());
}

@Override
public void onResponse(Tuple<RepositoryData, SnapshotInfo> repositoryData) {
delegate.onResponse(repositoryData);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,24 +147,9 @@ public void onFailure(Exception e) {
* <p>
* This method is called on master after all shards are snapshotted.
*
* @param shardGenerations updated shard generations
* @param repositoryStateId the unique id identifying the state of the repository when the snapshot began
* @param clusterMetadata cluster metadata
* @param snapshotInfo SnapshotInfo instance to write for this snapshot
* @param repositoryMetaVersion version of the updated repository metadata to write
* @param stateTransformer a function that filters the last cluster state update that the snapshot finalization will execute and
* is used to remove any state tracked for the in-progress snapshot from the cluster state
* @param listener listener to be invoked with the new {@link RepositoryData} after completing the snapshot
* @param finalizeSnapshotContext finalization context
*/
void finalizeSnapshot(
ShardGenerations shardGenerations,
long repositoryStateId,
Metadata clusterMetadata,
SnapshotInfo snapshotInfo,
Version repositoryMetaVersion,
Function<ClusterState, ClusterState> stateTransformer,
ActionListener<RepositoryData> listener
);
void finalizeSnapshot(FinalizeSnapshotContext finalizeSnapshotContext);

/**
* Deletes snapshots
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import org.elasticsearch.index.store.StoreFileMetadata;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.repositories.FinalizeSnapshotContext;
import org.elasticsearch.repositories.GetSnapshotInfoContext;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.IndexMetaDataGenerations;
Expand Down Expand Up @@ -1357,15 +1358,10 @@ private DeleteResult cleanupStaleIndices(Map<String, BlobContainer> foundIndices
}

@Override
public void finalizeSnapshot(
final ShardGenerations shardGenerations,
final long repositoryStateId,
final Metadata clusterMetadata,
SnapshotInfo snapshotInfo,
Version repositoryMetaVersion,
Function<ClusterState, ClusterState> stateTransformer,
final ActionListener<RepositoryData> listener
) {
public void finalizeSnapshot(final FinalizeSnapshotContext finalizeSnapshotContext) {
final long repositoryStateId = finalizeSnapshotContext.repositoryStateId();
final ShardGenerations shardGenerations = finalizeSnapshotContext.updatedShardGenerations();
final SnapshotInfo snapshotInfo = finalizeSnapshotContext.snapshotInfo();
assert repositoryStateId > RepositoryData.UNKNOWN_REPO_GEN
: "Must finalize based on a valid repository generation but received [" + repositoryStateId + "]";
final Collection<IndexId> indices = shardGenerations.indices();
Expand All @@ -1374,8 +1370,9 @@ public void finalizeSnapshot(
// directory if all nodes are at least at version SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION
// If there are older version nodes in the cluster, we don't need to run this cleanup as it will have already happened
// when writing the index-${N} to each shard directory.
final Version repositoryMetaVersion = finalizeSnapshotContext.repositoryMetaVersion();
final boolean writeShardGens = SnapshotsService.useShardGenerations(repositoryMetaVersion);
final Consumer<Exception> onUpdateFailure = e -> listener.onFailure(
final Consumer<Exception> onUpdateFailure = e -> finalizeSnapshotContext.onFailure(
new SnapshotException(metadata.name(), snapshotId, "failed to update snapshot in repository", e)
);

Expand All @@ -1388,7 +1385,7 @@ public void finalizeSnapshot(
repoDataListener.whenComplete(existingRepositoryData -> {
final int existingSnapshotCount = existingRepositoryData.getSnapshotIds().size();
if (existingSnapshotCount >= maxSnapshotCount) {
listener.onFailure(
finalizeSnapshotContext.onFailure(
new RepositoryException(
metadata.name(),
"Cannot add another snapshot to this repository as it "
Expand Down Expand Up @@ -1419,23 +1416,16 @@ public void finalizeSnapshot(
snapshotInfo.startTime(),
snapshotInfo.endTime()
);
final RepositoryData updatedRepositoryData = existingRepositoryData.addSnapshot(
snapshotId,
snapshotDetails,
shardGenerations,
indexMetas,
indexMetaIdentifiers
);
writeIndexGen(
updatedRepositoryData,
existingRepositoryData.addSnapshot(snapshotId, snapshotDetails, shardGenerations, indexMetas, indexMetaIdentifiers),
repositoryStateId,
repositoryMetaVersion,
stateTransformer,
finalizeSnapshotContext::updatedClusterState,
ActionListener.wrap(newRepoData -> {
if (writeShardGens) {
cleanupOldShardGens(existingRepositoryData, updatedRepositoryData);
cleanupOldShardGens(existingRepositoryData, newRepoData);
}
listener.onResponse(newRepoData);
finalizeSnapshotContext.onResponse(Tuple.tuple(newRepoData, snapshotInfo));
}, onUpdateFailure)
);
}, onUpdateFailure), 2 + indices.size());
Expand All @@ -1445,7 +1435,7 @@ public void finalizeSnapshot(
// index or global metadata will be compatible with the segments written in this snapshot as well.
// Failing on an already existing index-${repoGeneration} below ensures that the index.latest blob is not updated in a way
// that decrements the generation it points at

final Metadata clusterMetadata = finalizeSnapshotContext.clusterMetadata();
// Write Global MetaData
executor.execute(
ActionRunnable.run(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ public SnapshotInfo(
);
}

SnapshotInfo(
public SnapshotInfo(
Snapshot snapshot,
List<String> indices,
List<String> dataStreams,
Expand Down Expand Up @@ -458,7 +458,8 @@ public SnapshotInfo(
this.successfulShards = successfulShards;
this.shardFailures = org.elasticsearch.core.List.copyOf(shardFailures);
this.includeGlobalState = includeGlobalState;
this.userMetadata = userMetadata;
this.userMetadata = userMetadata == null ? null : org.elasticsearch.core.Map.copyOf(userMetadata);
;
this.indexSnapshotDetails = org.elasticsearch.core.Map.copyOf(indexSnapshotDetails);
}

Expand Down
Loading

0 comments on commit 1127473

Please sign in to comment.