Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Snapshot State Machine Issues around Failed Clones #76419

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
Expand Down Expand Up @@ -736,6 +737,66 @@ public void testManyConcurrentClonesStartOutOfOrder() throws Exception {
assertAcked(clone2.get());
}

public void testRemoveFailedCloneFromCSWithoutIO() throws Exception {
final String masterNode = internalCluster().startMasterOnlyNode();
internalCluster().startDataOnlyNode();
final String repoName = "test-repo";
createRepository(repoName, "mock");
final String testIndex = "index-test";
createIndexWithContent(testIndex);

final String sourceSnapshot = "source-snapshot";
createFullSnapshot(repoName, sourceSnapshot);

final String targetSnapshot = "target-snapshot";
blockAndFailMasterOnShardClone(repoName);
final ActionFuture<AcknowledgedResponse> cloneFuture = startClone(repoName, sourceSnapshot, targetSnapshot, testIndex);
awaitNumberOfSnapshotsInProgress(1);
waitForBlock(masterNode, repoName);
unblockNode(repoName, masterNode);
expectThrows(SnapshotException.class, cloneFuture::actionGet);
awaitNoMoreRunningOperations();
assertAllSnapshotsSuccessful(getRepositoryData(repoName), 1);
assertAcked(startDeleteSnapshot(repoName, sourceSnapshot).get());
}

public void testRemoveFailedCloneFromCSWithQueuedSnapshotInProgress() throws Exception {
// single threaded master snapshot pool so we can selectively fail part of a clone by letting it run shard by shard
final String masterNode = internalCluster().startMasterOnlyNode(
Settings.builder().put("thread_pool.snapshot.core", 1).put("thread_pool.snapshot.max", 1).build()
);
final String dataNode = internalCluster().startDataOnlyNode();
final String repoName = "test-repo";
createRepository(repoName, "mock");
final String testIndex = "index-test";
final String testIndex2 = "index-test-2";
createIndexWithContent(testIndex);
createIndexWithContent(testIndex2);

final String sourceSnapshot = "source-snapshot";
createFullSnapshot(repoName, sourceSnapshot);

final String targetSnapshot = "target-snapshot";
blockAndFailMasterOnShardClone(repoName);

createIndexWithContent("test-index-3");
blockDataNode(repoName, dataNode);
final ActionFuture<CreateSnapshotResponse> fullSnapshotFuture1 = startFullSnapshot(repoName, "full-snapshot-1");
waitForBlock(dataNode, repoName);
final ActionFuture<AcknowledgedResponse> cloneFuture = startClone(repoName, sourceSnapshot, targetSnapshot, testIndex, testIndex2);
awaitNumberOfSnapshotsInProgress(2);
waitForBlock(masterNode, repoName);
unblockNode(repoName, masterNode);
final ActionFuture<CreateSnapshotResponse> fullSnapshotFuture2 = startFullSnapshot(repoName, "full-snapshot-2");
expectThrows(SnapshotException.class, cloneFuture::actionGet);
unblockNode(repoName, dataNode);
awaitNoMoreRunningOperations();
assertSuccessful(fullSnapshotFuture1);
assertSuccessful(fullSnapshotFuture2);
assertAllSnapshotsSuccessful(getRepositoryData(repoName), 3);
assertAcked(startDeleteSnapshot(repoName, sourceSnapshot).get());
}

private ActionFuture<AcknowledgedResponse> startCloneFromDataNode(
String repoName,
String sourceSnapshot,
Expand Down Expand Up @@ -772,6 +833,10 @@ private void blockMasterOnShardClone(String repoName) {
AbstractSnapshotIntegTestCase.<MockRepository>getRepositoryOnMaster(repoName).setBlockOnWriteShardLevelMeta();
}

private void blockAndFailMasterOnShardClone(String repoName) {
AbstractSnapshotIntegTestCase.<MockRepository>getRepositoryOnMaster(repoName).setBlockAndFailOnWriteShardLevelMeta();
}

/**
* Assert that given {@link RepositoryData} contains exactly the given number of snapshots and all of them are successful.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public Metadata clusterMetadata() {
}

public ClusterState updatedClusterState(ClusterState state) {
return SnapshotsService.stateWithoutSuccessfulSnapshot(state, snapshotInfo.snapshot());
return SnapshotsService.stateWithoutSnapshot(state, snapshotInfo.snapshot());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1376,12 +1376,14 @@ private static boolean removedNodesCleanupNeeded(SnapshotsInProgress snapshotsIn
*/
private void endSnapshot(SnapshotsInProgress.Entry entry, Metadata metadata, @Nullable RepositoryData repositoryData) {
final Snapshot snapshot = entry.snapshot();
final boolean newFinalization = endingSnapshots.add(snapshot);
if (entry.isClone() && entry.state() == State.FAILED) {
logger.debug("Removing failed snapshot clone [{}] from cluster state", entry);
removeFailedSnapshotFromClusterState(snapshot, new SnapshotException(snapshot, entry.failure()), null);
if (newFinalization) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a test for this new condition? I deleted it and ran a few likely-looking suites and also all of :server:ictest but didn't see any failures.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not, I'm not sure how I could deterministically reproduce this at the moment. You can technically get here in some master fails over twice in a row scenarios. Though that would need to happen concurrently with finalizing a snapshot queued before the clone ... I'll think about a way to set this up, just added this here for now to be defensive and not create pointless CS updates (technically these updates should be idempotent anyway because they become noops as soon as the snapshot to be removed isn't in the CS).

removeFailedSnapshotFromClusterState(snapshot, new SnapshotException(snapshot, entry.failure()), null);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could do a possible follow-up here to remove data from partially failed clones from the repo. Since clones only really fail like this due to IO exceptions and since IO exceptions are really unlikely unless something is really broken I'm not sure it's worth the effort though because the cleanup will probably also fail (and would happen on a subsequent delete run). Since we aren't writing a new index-N (so this isn't relevant to the scalability of the repo really) and clones by their very nature add almost no bytes to the repo I think for now this is good enough.

}
return;
}
final boolean newFinalization = endingSnapshots.add(snapshot);
final String repoName = snapshot.getRepository();
if (tryEnterRepoLoop(repoName)) {
if (repositoryData == null) {
Expand Down Expand Up @@ -1741,15 +1743,15 @@ private static Tuple<ClusterState, List<SnapshotDeletionsInProgress.Entry>> read
}

/**
* Computes the cluster state resulting from removing a given snapshot create operation that was finalized in the repository from the
* given state. This method will update the shard generations of snapshots that the given snapshot depended on so that finalizing them
* will not cause rolling back to an outdated shard generation.
* Computes the cluster state resulting from removing a given snapshot create operation from the given state. This method will update
* the shard generations of snapshots that the given snapshot depended on so that finalizing them will not cause rolling back to an
* outdated shard generation.
*
* @param state current cluster state
* @param snapshot snapshot for which to remove the snapshot operation
* @return updated cluster state
*/
public static ClusterState stateWithoutSuccessfulSnapshot(ClusterState state, Snapshot snapshot) {
public static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot snapshot) {
// TODO: updating snapshots here leaks their outdated generation files, we should add logic to clean those up and enhance
// BlobStoreTestUtil to catch this leak
SnapshotsInProgress snapshots = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
Expand Down Expand Up @@ -1904,32 +1906,6 @@ private static <T> ImmutableOpenMap.Builder<T, ShardSnapshotStatus> maybeAddUpda
return updatedShardAssignments;
}

/**
* Computes the cluster state resulting from removing a given snapshot create operation from the given state after it has failed at
* any point before being finalized in the repository.
*
* @param state current cluster state
* @param snapshot snapshot for which to remove the snapshot operation
* @return updated cluster state
*/
private static ClusterState stateWithoutFailedSnapshot(ClusterState state, Snapshot snapshot) {
SnapshotsInProgress snapshots = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
ClusterState result = state;
boolean changed = false;
ArrayList<SnapshotsInProgress.Entry> entries = new ArrayList<>();
for (SnapshotsInProgress.Entry entry : snapshots.entries()) {
if (entry.snapshot().equals(snapshot)) {
changed = true;
} else {
entries.add(entry);
}
}
if (changed) {
result = ClusterState.builder(state).putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(entries)).build();
}
return readyDeletions(result).v1();
}

/**
* Removes record of running snapshot from cluster state and notifies the listener when this action is complete. This method is only
* used when the snapshot fails for some reason. During normal operation the snapshot repository will remove the
Expand All @@ -1946,7 +1922,7 @@ private void removeFailedSnapshotFromClusterState(Snapshot snapshot, Exception f

@Override
public ClusterState execute(ClusterState currentState) {
final ClusterState updatedState = stateWithoutFailedSnapshot(currentState, snapshot);
final ClusterState updatedState = stateWithoutSnapshot(currentState, snapshot);
assert updatedState == currentState || endingSnapshots.contains(snapshot)
: "did not track [" + snapshot + "] in ending snapshots while removing it from the cluster state";
// now check if there are any delete operations that refer to the just failed snapshot and remove the snapshot from them
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ public long getFailureCount() {

private volatile boolean blockOnWriteShardLevelMeta;

private volatile boolean blockAndFailOnWriteShardLevelMeta;

private volatile boolean blockOnReadIndexMeta;

private final AtomicBoolean blockOnceOnReadSnapshotInfo = new AtomicBoolean(false);
Expand Down Expand Up @@ -224,6 +226,7 @@ public synchronized void unblock() {
blockedIndexId = null;
blockOnDeleteIndexN = false;
blockOnWriteShardLevelMeta = false;
blockAndFailOnWriteShardLevelMeta = false;
blockOnReadIndexMeta = false;
blockOnceOnReadSnapshotInfo.set(false);
blockAndFailOnReadSnapFile = false;
Expand Down Expand Up @@ -268,9 +271,15 @@ public void setBlockOnDeleteIndexFile() {
}

public void setBlockOnWriteShardLevelMeta() {
assert blockAndFailOnWriteShardLevelMeta == false : "Either fail or wait after blocking on shard level metadata, not both";
blockOnWriteShardLevelMeta = true;
}

public void setBlockAndFailOnWriteShardLevelMeta() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not great to add yet another method to this, but this case turns out to be important I guess. Reproducing this like we used to via blocking and then restarting master simply did not cover this obviously broken spot.

assert blockOnWriteShardLevelMeta == false : "Either fail or wait after blocking on shard level metadata, not both";
blockAndFailOnWriteShardLevelMeta = true;
}

public void setBlockOnReadIndexMeta() {
blockOnReadIndexMeta = true;
}
Expand Down Expand Up @@ -310,8 +319,8 @@ private synchronized boolean blockExecution() {
boolean wasBlocked = false;
try {
while (blockAndFailOnDataFiles || blockOnDataFiles || blockOnAnyFiles || blockAndFailOnWriteIndexFile || blockOnWriteIndexFile
|| blockAndFailOnWriteSnapFile || blockOnDeleteIndexN || blockOnWriteShardLevelMeta || blockOnReadIndexMeta
|| blockAndFailOnReadSnapFile || blockAndFailOnReadIndexFile || blockedIndexId != null) {
|| blockAndFailOnWriteSnapFile || blockOnDeleteIndexN || blockOnWriteShardLevelMeta || blockAndFailOnWriteShardLevelMeta
|| blockOnReadIndexMeta || blockAndFailOnReadSnapFile || blockAndFailOnReadIndexFile || blockedIndexId != null) {
blocked = true;
this.wait();
wasBlocked = true;
Expand Down Expand Up @@ -555,9 +564,12 @@ public void writeBlob(String blobName,

private void beforeWrite(String blobName) throws IOException {
maybeIOExceptionOrBlock(blobName);
if (blockOnWriteShardLevelMeta && blobName.startsWith(BlobStoreRepository.SNAPSHOT_PREFIX)
&& path().equals(basePath()) == false) {
blockExecutionAndMaybeWait(blobName);
if (blobName.startsWith(BlobStoreRepository.SNAPSHOT_PREFIX) && path().equals(basePath()) == false) {
if (blockOnWriteShardLevelMeta) {
blockExecutionAndMaybeWait(blobName);
} else if (blockAndFailOnWriteShardLevelMeta) {
blockExecutionAndFail(blobName);
}
}
}

Expand Down