Skip to content

Commit

Permalink
Fix deleting index during snapshot finalization
Browse files Browse the repository at this point in the history
Today if an index is deleted during a very specific order of snapshot
finalizations then it's possible we'll miscalculate the latest shard
generations for the shards in that index, causing the deletion of a
shard-level `index-UUID` blob which prevents further snapshots of that
shard.

Backports elastic#103817 to 7.17
Closes elastic#101029
  • Loading branch information
DaveCTurner committed Jan 15, 2024
1 parent b6114f4 commit 277c546
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 16 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/103817.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 103817
summary: Fix deleting index during snapshot finalization
area: Snapshot/Restore
type: bug
issues:
- 101029
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,35 @@
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexClusterStateUpdateRequest;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.metadata.MetadataDeleteIndexService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.discovery.AbstractDisruptionTestCase;
import org.elasticsearch.index.Index;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.ShardGenerations;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.snapshots.mockstore.MockRepository;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.disruption.NetworkDisruption;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.nio.file.Files;
Expand All @@ -45,9 +54,12 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
Expand Down Expand Up @@ -2014,6 +2026,108 @@ public void testSnapshotAndCloneQueuedAfterMissingShard() throws Exception {
assertThat(snapshot2.get().getSnapshotInfo().state(), is(SnapshotState.PARTIAL));
}

public void testDeleteIndexWithOutOfOrderFinalization() {

final String indexToDelete = "index-to-delete";
final List<String> indexNames = Arrays.asList(indexToDelete, "index-0", "index-1", "index-2");

for (final String indexName : indexNames) {
assertAcked(prepareCreate(indexName, indexSettingsNoReplicas(1)));
}

final String repoName = "test-repo";
createRepository(repoName, "fs");

// block the update-shard-snapshot-status requests so we can execute them in a specific order
final MockTransportService masterTransportService = (MockTransportService) internalCluster().getCurrentMasterNodeInstance(
TransportService.class
);
final Map<String, ListenableFuture<Void>> otherIndexSnapshotListeners = indexNames.stream()
.collect(Collectors.toMap(k -> k, k -> new ListenableFuture<>()));
masterTransportService.<UpdateIndexShardSnapshotStatusRequest>addRequestHandlingBehavior(
SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME,
(handler, request, channel, task) -> {
final String indexName = request.shardId().getIndexName();
if (indexName.equals(indexToDelete)) {
handler.messageReceived(request, channel, task);
} else {
final ListenableFuture<Void> listener = otherIndexSnapshotListeners.get(indexName);
assertNotNull(indexName, listener);
listener.addListener(
ActionTestUtils.assertNoFailureListener(ignored -> handler.messageReceived(request, channel, task))
);
}
}
);

// start the snapshots, each targeting index-to-delete and one other index so we can control their finalization order
final HashMap<String, Runnable> snapshotCompleters = new HashMap<String, Runnable>();
for (final String blockingIndex : Arrays.asList("index-0", "index-1", "index-2")) {
final String snapshotName = "snapshot-with-" + blockingIndex;
final ActionFuture<CreateSnapshotResponse> snapshotFuture = clusterAdmin().prepareCreateSnapshot(repoName, snapshotName)
.setWaitForCompletion(true)
.setPartial(true)
.setIndices(indexToDelete, blockingIndex)
.execute();

// ensure each snapshot has really started before moving on to the next one
safeAwait(
ClusterServiceUtils.addTemporaryStateListener(
internalCluster().getInstance(ClusterService.class),
cs -> SnapshotsInProgress.get(cs)
.forRepo(repoName)
.stream()
.anyMatch(e -> e.snapshot().getSnapshotId().getName().equals(snapshotName))
)
);

snapshotCompleters.put(blockingIndex, () -> {
assertFalse(snapshotFuture.isDone());
otherIndexSnapshotListeners.get(blockingIndex).onResponse(null);
assertEquals(SnapshotState.SUCCESS, snapshotFuture.actionGet(10, TimeUnit.SECONDS).getSnapshotInfo().state());
});
}

// set up to delete the index at a very specific moment during finalization
final MetadataDeleteIndexService masterDeleteIndexService = internalCluster().getCurrentMasterNodeInstance(MetadataDeleteIndexService.class);
final var indexRecreatedListener = ClusterServiceUtils
// wait until the snapshot has entered finalization
.addTemporaryStateListener(
internalCluster().getInstance(ClusterService.class),
cs -> SnapshotsInProgress.get(cs)
.forRepo(repoName)
.stream()
.anyMatch(e -> e.snapshot().getSnapshotId().getName().equals("snapshot-with-index-1") && e.state().completed())
)
// execute the index deletion _directly on the master_ so it happens before the snapshot finalization executes
.andThen((l, ignored) -> masterDeleteIndexService.deleteIndices(new DeleteIndexClusterStateUpdateRequest(l.map(r -> {
assertTrue(r.isAcknowledged());
return null;
})).indices(new Index[] { internalCluster().clusterService().state().metadata().index(indexToDelete).getIndex() })
.ackTimeout(TimeValue.timeValueSeconds(10))
.masterNodeTimeout(TimeValue.timeValueSeconds(10))))
// ultimately create the index again so that taking a full snapshot will pick up any missing shard gen blob, and deleting that
// full snapshot will clean up all dangling shard-level blobs
.andThen((l, ignored) -> prepareCreate(indexToDelete, indexSettingsNoReplicas(1)).execute(l.map(r -> {
assertTrue(r.isAcknowledged());
return null;
})));

// release the snapshots to be finalized, in this order
for (final String blockingIndex : Arrays.asList("index-1", "index-2", "index-0")) {
snapshotCompleters.get(blockingIndex).run();
}

safeAwait(indexRecreatedListener);
masterTransportService.clearAllRules();

// create a full snapshot to verify that the repo is still ok
createFullSnapshot(repoName, "final-full-snapshot");

// delete the full snapshot to clean up the leftover shard-level metadata (which trips repo consistency assertions otherwise)
startDeleteSnapshot(repoName, "final-full-snapshot").actionGet(10, TimeUnit.SECONDS);
}

private static void assertSnapshotStatusCountOnRepo(String otherBlockedRepoName, int count) {
final SnapshotsStatusResponse snapshotsStatusResponse = client().admin()
.cluster()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public Map<RepositoryShardId, Set<ShardGeneration>> obsoleteShardGenerations() {
}

public ClusterState updatedClusterState(ClusterState state) {
final ClusterState updatedState = SnapshotsService.stateWithoutSnapshot(state, snapshotInfo.snapshot());
final ClusterState updatedState = SnapshotsService.stateWithoutSnapshot(state, snapshotInfo.snapshot(), updatedShardGenerations);
obsoleteGenerations.set(
updatedState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY)
.obsoleteGenerations(snapshotInfo.repository(), state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ public ShardGeneration getShardGen(IndexId indexId, int shardId) {
return generations.get(shardId);
}

public boolean hasShardGen(RepositoryShardId repositoryShardId) {
final List<ShardGeneration> indexShardGens = getGens(repositoryShardId.index());
return repositoryShardId.shardId() < indexShardGens.size() && indexShardGens.get(repositoryShardId.shardId()) != null;
}

public List<ShardGeneration> getGens(IndexId indexId) {
return shardGenerations.getOrDefault(indexId, Collections.emptyList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1504,6 +1504,7 @@ private void cleanupOldShardGens(
(indexId, gens) -> gens.forEach(
(shardId, oldGen) -> toDelete.add(
shardContainer(indexId, shardId).path().buildAsString().substring(prefixPathLen) + INDEX_FILE_PREFIX + oldGen
.toBlobNamePart()
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ private void startCloning(Repository repository, SnapshotsInProgress.Entry clone
endingSnapshots.add(targetSnapshot);
initializingClones.remove(targetSnapshot);
logger.info(() -> new ParameterizedMessage("Failed to start snapshot clone [{}]", cloneEntry), e);
removeFailedSnapshotFromClusterState(targetSnapshot, e, null, null);
removeFailedSnapshotFromClusterState(targetSnapshot, e, null, null, ShardGenerations.EMPTY);
};

// 1. step, load SnapshotInfo to make sure that source snapshot was successful for the indices we want to clone
Expand Down Expand Up @@ -1194,7 +1194,8 @@ public void onFailure(String source, Exception e) {
snapshot.snapshot(),
e,
null,
new CleanupAfterErrorListener(userCreateSnapshotListener, e)
new CleanupAfterErrorListener(userCreateSnapshotListener, e),
ShardGenerations.EMPTY
);
}

Expand Down Expand Up @@ -1238,7 +1239,8 @@ public void onFailure(Exception e) {
snapshot.snapshot(),
e,
null,
new CleanupAfterErrorListener(userCreateSnapshotListener, e)
new CleanupAfterErrorListener(userCreateSnapshotListener, e),
ShardGenerations.EMPTY
);
}
});
Expand Down Expand Up @@ -1876,14 +1878,21 @@ private void endSnapshot(SnapshotsInProgress.Entry entry, Metadata metadata, @Nu
entry.snapshot(),
new SnapshotException(snapshot, "Aborted on initialization"),
repositoryData,
null
null,
ShardGenerations.EMPTY
);
return;
}
if (entry.isClone() && entry.state() == State.FAILED) {
logger.debug("Removing failed snapshot clone [{}] from cluster state", entry);
if (newFinalization) {
removeFailedSnapshotFromClusterState(snapshot, new SnapshotException(snapshot, entry.failure()), null, null);
removeFailedSnapshotFromClusterState(
snapshot,
new SnapshotException(snapshot, entry.failure()),
null,
null,
ShardGenerations.EMPTY
);
}
return;
}
Expand Down Expand Up @@ -2055,13 +2064,30 @@ private void finalizeSnapshotEntry(Snapshot snapshot, Metadata metadata, Reposit
completeListenersIgnoringException(endAndGetListenersToResolve(writtenSnapshotInfo.snapshot()), result);
logger.info("snapshot [{}] completed with state [{}]", snapshot, writtenSnapshotInfo.state());
runNextQueuedOperation(result.v1(), repository, true);
}, e -> handleFinalizationFailure(e, snapshot, repositoryData))
},
e -> handleFinalizationFailure(
e,
snapshot,
repositoryData,
// we might have written the new root blob before failing here, so we must use the updated shardGenerations
shardGenerations
)
)
)
);
}, e -> handleFinalizationFailure(e, snapshot, repositoryData));
},
e -> handleFinalizationFailure(
e,
snapshot,
repositoryData,
// a failure here means the root blob was not updated, but the updated shard generation blobs are all in place so we can
// use the updated shardGenerations for all pending shard snapshots
shardGenerations
)
);
} catch (Exception e) {
assert false : new AssertionError(e);
handleFinalizationFailure(e, snapshot, repositoryData);
handleFinalizationFailure(e, snapshot, repositoryData, ShardGenerations.EMPTY);
}
}

Expand Down Expand Up @@ -2113,7 +2139,12 @@ private List<ActionListener<Tuple<RepositoryData, SnapshotInfo>>> endAndGetListe
* @param snapshot snapshot that failed to finalize
* @param repositoryData current repository data for the snapshot's repository
*/
private void handleFinalizationFailure(Exception e, Snapshot snapshot, RepositoryData repositoryData) {
private void handleFinalizationFailure(
Exception e,
Snapshot snapshot,
RepositoryData repositoryData,
ShardGenerations shardGenerations
) {
if (ExceptionsHelper.unwrap(e, NotMasterException.class, FailedToCommitClusterStateException.class) != null) {
// Failure due to not being master any more, don't try to remove snapshot from cluster state the next master
// will try ending this snapshot again
Expand All @@ -2125,7 +2156,7 @@ private void handleFinalizationFailure(Exception e, Snapshot snapshot, Repositor
failAllListenersOnMasterFailOver(e);
} else {
logger.warn(() -> new ParameterizedMessage("[{}] failed to finalize snapshot", snapshot), e);
removeFailedSnapshotFromClusterState(snapshot, e, repositoryData, null);
removeFailedSnapshotFromClusterState(snapshot, e, repositoryData, null, shardGenerations);
}
}

Expand Down Expand Up @@ -2251,7 +2282,7 @@ private static Tuple<ClusterState, List<SnapshotDeletionsInProgress.Entry>> read
* @param snapshot snapshot for which to remove the snapshot operation
* @return updated cluster state
*/
public static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot snapshot) {
public static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot snapshot, ShardGenerations shardGenerations) {
final SnapshotsInProgress snapshots = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
ClusterState result = state;
int indexOfEntry = -1;
Expand Down Expand Up @@ -2312,7 +2343,8 @@ public static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot sna
final ShardSnapshotStatus shardState = finishedShardEntry.value;
final RepositoryShardId repositoryShardId = finishedShardEntry.key;
if (shardState.state() != ShardState.SUCCESS
|| previousEntry.shardsByRepoShardId().containsKey(repositoryShardId) == false) {
|| previousEntry.shardsByRepoShardId().containsKey(repositoryShardId) == false
|| shardGenerations.hasShardGen(finishedShardEntry.key) == false) {
continue;
}
updatedShardAssignments = maybeAddUpdatedAssignment(
Expand All @@ -2329,7 +2361,8 @@ public static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot sna
.shardsByRepoShardId()) {
final ShardSnapshotStatus shardState = finishedShardEntry.value;
if (shardState.state() == ShardState.SUCCESS
&& previousEntry.shardsByRepoShardId().containsKey(finishedShardEntry.key)) {
&& previousEntry.shardsByRepoShardId().containsKey(finishedShardEntry.key)
&& shardGenerations.hasShardGen(finishedShardEntry.key)) {
updatedShardAssignments = maybeAddUpdatedAssignment(
updatedShardAssignments,
shardState,
Expand Down Expand Up @@ -2417,14 +2450,15 @@ private void removeFailedSnapshotFromClusterState(
Snapshot snapshot,
Exception failure,
@Nullable RepositoryData repositoryData,
@Nullable CleanupAfterErrorListener listener
@Nullable CleanupAfterErrorListener listener,
ShardGenerations shardGenerations
) {
assert failure != null : "Failure must be supplied";
clusterService.submitStateUpdateTask("remove snapshot metadata", new ClusterStateUpdateTask() {

@Override
public ClusterState execute(ClusterState currentState) {
final ClusterState updatedState = stateWithoutSnapshot(currentState, snapshot);
final ClusterState updatedState = stateWithoutSnapshot(currentState, snapshot, shardGenerations);
assert updatedState == currentState || endingSnapshots.contains(snapshot)
: "did not track [" + snapshot + "] in ending snapshots while removing it from the cluster state";
// now check if there are any delete operations that refer to the just failed snapshot and remove the snapshot from them
Expand Down

0 comments on commit 277c546

Please sign in to comment.