Skip to content

Commit

Permalink
Fix Snapshot Delete Needlessly Failing on Concurrent Snapshot
Browse files Browse the repository at this point in the history
We do the delete in three steps here:

1. Get the repository data and find the snapshot ids to delete
2. Put the delete entry in the CS
3. Run the actual delete

The test `testConcurrentSnapshotCreateAndDeleteOther` was failing because
between `1.` and `2.` a full snapshot completed moving the repository generation
ahead by 1 which we chose to fail on because we expect the repository generation from
step `1.` to still be there in step `3.`.
In the past, using the repository generation from step `1.` made sense as a safety measure
because any rapid increase in repository generation could have spelled trouble on eventually
consistent blob stores. Nowadays, it's just needless to fail here though and we can simply
rely on the generation we read from the repository in step `3.` to avoid ever passing a
broken repository generation to the repository when deleting.

NOTE: This exception was always a possibility but became massively more likely due to
improved/faster snapshot finalization via elastic#55276 so it only showed up now.

Closes elastic#55702
  • Loading branch information
original-brownbear committed Apr 24, 2020
1 parent bd22ab2 commit 0f7ad40
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,9 @@ void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations,
/**
* Deletes snapshots
*
* @param snapshotIds snapshot ids
* @param snapshotIds snapshot ids to delete (must not be empty)
* @param repositoryStateId the unique id identifying the state of the repository when the snapshot deletion began
* (repository is expected to contain all given {@code snapshotIds} at this generation)
* @param repositoryMetaVersion version of the updated repository metadata to write
* @param listener completion listener
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,9 @@ public void deleteSnapshots(Collection<SnapshotId> snapshotIds, long repositoryS
try {
final Map<String, BlobMetadata> rootBlobs = blobContainer().listBlobs();
final RepositoryData repositoryData = safeRepositoryData(repositoryStateId, rootBlobs);
assert snapshotIds.isEmpty() == false;
assert repositoryData.getSnapshotIds().containsAll(snapshotIds) : "Repository data contained snapshots " +
repositoryData.getSnapshotIds() + " but tried to delete " + snapshotIds;
// Cache the indices that were found before writing out the new index-N blob so that a stuck master will never
// delete an index that was created by another master node after writing this index-N blob.
final Map<String, BlobContainer> foundIndices = blobStore().blobContainer(indicesPath()).children();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -609,8 +609,7 @@ private void finalizeSnapshotDeletionFromPreviousMaster(ClusterState state) {
if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
assert deletionsInProgress.getEntries().size() == 1 : "only one in-progress deletion allowed per cluster";
SnapshotDeletionsInProgress.Entry entry = deletionsInProgress.getEntries().get(0);
deleteSnapshotsFromRepository(entry.repository(), entry.getSnapshots(), null, entry.repositoryStateId(),
state.nodes().getMinNodeVersion());
deleteSnapshotsFromRepository(entry.repository(), entry.getSnapshots(), null, state.nodes().getMinNodeVersion());
}
}

Expand Down Expand Up @@ -1230,7 +1229,7 @@ public void onFailure(String source, Exception e) {

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
deleteSnapshotsFromRepository(repoName, snapshotIds, listener, repositoryStateId, newState.nodes().getMinNodeVersion());
deleteSnapshotsFromRepository(repoName, snapshotIds, listener, newState.nodes().getMinNodeVersion());
}
});
}
Expand Down Expand Up @@ -1295,21 +1294,33 @@ public static boolean useShardGenerations(Version repositoryMetaVersion) {
* @param repoName repository name
* @param snapshotIds snapshot ids
* @param listener listener
* @param repositoryStateId the unique id representing the state of the repository at the time the deletion began
* @param minNodeVersion minimum node version in the cluster
*/
private void deleteSnapshotsFromRepository(String repoName, Collection<SnapshotId> snapshotIds, @Nullable ActionListener<Void> listener,
long repositoryStateId, Version minNodeVersion) {
Version minNodeVersion) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> {
Repository repository = repositoriesService.repository(repoName);
repository.getRepositoryData(ActionListener.wrap(repositoryData -> repository.deleteSnapshots(snapshotIds,
repositoryStateId,
minCompatibleVersion(minNodeVersion, repoName, repositoryData, snapshotIds),
ActionListener.wrap(v -> {
logger.info("snapshots {} deleted", snapshotIds);
removeSnapshotDeletionFromClusterState(snapshotIds, null, l);
}, ex -> removeSnapshotDeletionFromClusterState(snapshotIds, ex, l)
)), ex -> removeSnapshotDeletionFromClusterState(snapshotIds, ex, l)));
repository.getRepositoryData(ActionListener.wrap(repositoryData -> {
final Collection<SnapshotId> snapshotIdsInRepo = repositoryData.getSnapshotIds();
final Collection<SnapshotId> snapshotIdsToDelete =
snapshotIds.stream().filter(snapshotIdsInRepo::contains).collect(Collectors.toList());
if (snapshotIdsToDelete.size() < snapshotIds.size()) {
logger.info("Some snapshots from {} were concurrently deleted already, only deleting {}",
snapshotIds, snapshotIdsToDelete);
}
if (snapshotIdsToDelete.isEmpty()) {
removeSnapshotDeletionFromClusterState(snapshotIdsToDelete, null, l);
return;
}
repository.deleteSnapshots(snapshotIdsToDelete,
repositoryData.getGenId(),
minCompatibleVersion(minNodeVersion, repoName, repositoryData, snapshotIdsToDelete),
ActionListener.wrap(v -> {
logger.info("snapshots {} deleted", snapshotIdsToDelete);
removeSnapshotDeletionFromClusterState(snapshotIdsToDelete, null, l);
}, ex -> removeSnapshotDeletionFromClusterState(snapshotIdsToDelete, ex, l)
));
}, ex -> removeSnapshotDeletionFromClusterState(snapshotIds, ex, l)));
}));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -220,6 +221,7 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.iterableWithSize;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.mockito.Mockito.mock;

public class SnapshotResiliencyTests extends ESTestCase {
Expand Down Expand Up @@ -506,7 +508,6 @@ public void clusterChanged(ClusterChangedEvent event) {
assertEquals(0, snapshotInfo.failedShards());
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/55702")
public void testConcurrentSnapshotCreateAndDeleteOther() {
setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10));

Expand Down Expand Up @@ -575,6 +576,60 @@ public void testConcurrentSnapshotCreateAndDeleteOther() {
}
}

public void testConcurrentDeletes() {
setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10));

String repoName = "repo";
String snapshotName = "snapshot";
final String index = "test";
final int shards = randomIntBetween(1, 10);

TestClusterNodes.TestClusterNode masterNode =
testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state());

final StepListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new StepListener<>();

continueOrDie(createRepoAndIndex(repoName, index, shards),
createIndexResponse -> client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
.setWaitForCompletion(true).execute(createSnapshotResponseStepListener));

final Collection<StepListener<Boolean>> deleteSnapshotStepListeners = List.of(new StepListener<>(), new StepListener<>());

final AtomicInteger successfulDeletes = new AtomicInteger(0);

continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> {
for (StepListener<Boolean> deleteListener : deleteSnapshotStepListeners) {
client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotName).execute(
ActionListener.wrap(
resp -> deleteListener.onResponse(true),
e -> {
final Throwable unwrapped = ExceptionsHelper.unwrap(
e, ConcurrentSnapshotExecutionException.class, SnapshotMissingException.class);
assertThat(unwrapped, notNullValue());
deleteListener.onResponse(false);
}));
}
});

for (StepListener<Boolean> deleteListener : deleteSnapshotStepListeners) {
continueOrDie(deleteListener, deleted -> {
if (deleted) {
successfulDeletes.incrementAndGet();
}
});
}

deterministicTaskQueue.runAllRunnableTasks();

SnapshotDeletionsInProgress deletionsInProgress = masterNode.clusterService.state().custom(SnapshotDeletionsInProgress.TYPE);
assertFalse(deletionsInProgress.hasDeletionsInProgress());
final Repository repository = masterNode.repositoriesService.repository(repoName);
Collection<SnapshotId> snapshotIds = getRepositoryData(repository).getSnapshotIds();
// We end up with no snapshots since at least one of the deletes worked out
assertThat(snapshotIds, empty());
assertThat(successfulDeletes.get(), either(is(1)).or(is(2)));
}

public void testConcurrentSnapshotRestoreAndDeleteOther() {
setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10));

Expand Down

0 comments on commit 0f7ad40

Please sign in to comment.