Skip to content

Commit

Permalink
Fix deleting index during snapshot finalization
Browse files Browse the repository at this point in the history
Today if an index is deleted during a very specific order of snapshot
finalizations then it's possible we'll miscalculate the latest shard
generations for the shards in that index, causing the deletion of a
shard-level `index-UUID` blob which prevents further snapshots of that
shard.

Closes elastic#101029
  • Loading branch information
DaveCTurner committed Jan 2, 2024
1 parent 4744215 commit 7c64634
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
Expand All @@ -36,6 +39,7 @@
import org.elasticsearch.repositories.ShardGenerations;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.snapshots.mockstore.MockRepository;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.disruption.NetworkDisruption;
import org.elasticsearch.test.transport.MockTransportService;
Expand All @@ -48,10 +52,12 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFileExists;
Expand Down Expand Up @@ -2060,6 +2066,102 @@ public void testQueuedSnapshotAfterPartialWithIndexRecreate() throws Exception {
assertSuccessful(partialFuture);
}

public void testDeleteIndexWithOutOfOrderFinalization() {

final var indexToDelete = "index-to-delete";
final var indexNames = List.of(indexToDelete, "index-0", "index-1", "index-2");

for (final var indexName : indexNames) {
assertAcked(prepareCreate(indexName, indexSettingsNoReplicas(1)));
}

final var repoName = "test-repo";
createRepository(repoName, "fs");

// block the update-shard-snapshot-status requests so we can execute them in a specific order
final var masterTransportService = MockTransportService.getInstance(internalCluster().getMasterName());
final Map<String, SubscribableListener<Void>> otherIndexSnapshotListeners = indexNames.stream()
.collect(Collectors.toMap(k -> k, k -> new SubscribableListener<>()));
masterTransportService.<UpdateIndexShardSnapshotStatusRequest>addRequestHandlingBehavior(
SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME,
(handler, request, channel, task) -> {
final var indexName = request.shardId().getIndexName();
if (indexName.equals(indexToDelete)) {
handler.messageReceived(request, channel, task);
} else {
final var listener = otherIndexSnapshotListeners.get(indexName);
assertNotNull(indexName, listener);
listener.addListener(
ActionTestUtils.assertNoFailureListener(ignored -> handler.messageReceived(request, channel, task))
);
}
}
);

// start the snapshots, each targeting index-to-delete and one other index so we can control their finalization order
final var snapshotCompleters = new HashMap<String, Runnable>();
for (final var blockingIndex : List.of("index-0", "index-1", "index-2")) {
final var snapshotName = "snapshot-with-" + blockingIndex;
final var snapshotFuture = clusterAdmin().prepareCreateSnapshot(repoName, snapshotName)
.setWaitForCompletion(true)
.setPartial(true)
.setIndices(indexToDelete, blockingIndex)
.execute();

// ensure each snapshot has really started before moving on to the next one
safeAwait(
ClusterServiceUtils.addTemporaryStateListener(
internalCluster().getInstance(ClusterService.class),
cs -> SnapshotsInProgress.get(cs)
.forRepo(repoName)
.stream()
.anyMatch(e -> e.snapshot().getSnapshotId().getName().equals(snapshotName))
)
);

snapshotCompleters.put(blockingIndex, () -> {
assertFalse(snapshotFuture.isDone());
otherIndexSnapshotListeners.get(blockingIndex).onResponse(null);
assertEquals(SnapshotState.SUCCESS, snapshotFuture.actionGet(10, TimeUnit.SECONDS).getSnapshotInfo().state());
});
}

// set up to delete the index at a very specific moment during finalization
final var masterIndicesClient = internalCluster().masterClient().admin().indices();
final var indexRecreatedListener = ClusterServiceUtils
// wait until the snapshot has entered finalization
.addTemporaryStateListener(
internalCluster().getInstance(ClusterService.class),
cs -> SnapshotsInProgress.get(cs)
.forRepo(repoName)
.stream()
.anyMatch(e -> e.snapshot().getSnapshotId().getName().equals("snapshot-with-index-1") && e.state().completed())
)
// execute the index deletion _directly on the master_ so it happens before the snapshot finalization executes
.andThen((l, ignored) -> masterIndicesClient.prepareDelete(indexToDelete).execute(l.map(r -> {
assertTrue(r.isAcknowledged());
return null;
})))
.andThen((l, ignored) -> prepareCreate(indexToDelete, indexSettingsNoReplicas(1)).execute(l.map(r -> {
assertTrue(r.isAcknowledged());
return null;
})));

// release the snapshots to be finalized, in this order
for (final var blockingIndex : List.of("index-1", "index-2", "index-0")) {
snapshotCompleters.get(blockingIndex).run();
}

safeAwait(indexRecreatedListener);
masterTransportService.clearAllRules();

// create a full snapshot to verify that the repo is still ok
createFullSnapshot(repoName, "final-full-snapshot");

// delete the full snapshot to clean up the leftover shard-level metadata (which trips repo consistency assertions otherwise)
startDeleteSnapshot(repoName, "final-full-snapshot").actionGet(10, TimeUnit.SECONDS);
}

private static void assertSnapshotStatusCountOnRepo(String otherBlockedRepoName, int count) {
final SnapshotsStatusResponse snapshotsStatusResponse = clusterAdmin().prepareSnapshotStatus(otherBlockedRepoName).get();
final List<SnapshotStatus> snapshotStatuses = snapshotsStatusResponse.getSnapshots();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1741,6 +1741,7 @@ private void cleanupOldMetadata(
(indexId, gens) -> gens.forEach(
(shardId, oldGen) -> toDelete.add(
shardPath(indexId, shardId).buildAsString().substring(prefixPathLen) + INDEX_FILE_PREFIX + oldGen
.toBlobNamePart()
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1762,7 +1762,8 @@ public static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot sna
final ShardSnapshotStatus shardState = finishedShardEntry.getValue();
final RepositoryShardId repositoryShardId = finishedShardEntry.getKey();
if (shardState.state() != ShardState.SUCCESS
|| previousEntry.shardsByRepoShardId().containsKey(repositoryShardId) == false) {
|| previousEntry.shardsByRepoShardId().containsKey(repositoryShardId) == false
|| indexExists(removedEntry, state.metadata(), finishedShardEntry.getKey().indexName()) == false) {
continue;
}
updatedShardAssignments = maybeAddUpdatedAssignment(
Expand All @@ -1779,7 +1780,8 @@ public static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot sna
.entrySet()) {
final ShardSnapshotStatus shardState = finishedShardEntry.getValue();
if (shardState.state() == ShardState.SUCCESS
&& previousEntry.shardsByRepoShardId().containsKey(finishedShardEntry.getKey())) {
&& previousEntry.shardsByRepoShardId().containsKey(finishedShardEntry.getKey())
&& indexExists(removedEntry, state.metadata(), finishedShardEntry.getKey().indexName())) {
updatedShardAssignments = maybeAddUpdatedAssignment(
updatedShardAssignments,
shardState,
Expand All @@ -1802,6 +1804,11 @@ public static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot sna
return readyDeletions(result).v1();
}

private static boolean indexExists(SnapshotsInProgress.Entry snapshotsInProgressEntry, Metadata metadata, String indexName) {
final var index = snapshotsInProgressEntry.indexByName(indexName);
return index != null && metadata.index(index) != null;
}

private static void addSnapshotEntry(
List<SnapshotsInProgress.Entry> entries,
SnapshotsInProgress.Entry entryToUpdate,
Expand Down

0 comments on commit 7c64634

Please sign in to comment.