Skip to content

Commit

Permalink
Fix Concurrent Snapshot Repository Corruption from Operations Queued …
Browse files Browse the repository at this point in the history
…after Failing Operations (#75733)

The node executing a shard level operation would in many cases communicate `null` for the shard state update,
leading to follow-up operations incorrectly assuming an empty shard snapshot directory and starting from scratch.

closes #75598
  • Loading branch information
original-brownbear authored Jul 27, 2021
1 parent ed7a65e commit f1ba7c4
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1611,6 +1611,52 @@ public void testOutOfOrderCloneFinalization() throws Exception {
);
}

public void testQueuedAfterFailedShardSnapshot() throws Exception {
internalCluster().startMasterOnlyNode();
final String dataNode = internalCluster().startDataOnlyNode();

final String repository = "test-repo";
createRepository(repository, "mock");

final String indexName = "test-idx";
createIndexWithContent(indexName);
final String fullSnapshot = "full-snapshot";
createFullSnapshot(repository, fullSnapshot);

indexDoc(indexName, "some_id", "foo", "bar");
blockAndFailDataNode(repository, dataNode);
final ActionFuture<CreateSnapshotResponse> snapshotFutureFailure = startFullSnapshot(repository, "failing-snapshot");
awaitNumberOfSnapshotsInProgress(1);
waitForBlock(dataNode, repository);
final ActionFuture<CreateSnapshotResponse> snapshotFutureSuccess = startFullSnapshot(repository, "successful-snapshot");
awaitNumberOfSnapshotsInProgress(2);
unblockNode(repository, dataNode);

assertSuccessful(snapshotFutureSuccess);
final SnapshotInfo failedSnapshot = snapshotFutureFailure.get().getSnapshotInfo();
assertEquals(SnapshotState.PARTIAL, failedSnapshot.state());

final SnapshotsStatusResponse snapshotsStatusResponse1 = clusterAdmin().prepareSnapshotStatus(repository)
.setSnapshots(fullSnapshot)
.get();

final String tmpSnapshot = "snapshot-tmp";
createFullSnapshot(repository, tmpSnapshot);
assertAcked(startDeleteSnapshot(repository, tmpSnapshot).get());

final SnapshotsStatusResponse snapshotsStatusResponse2 = clusterAdmin().prepareSnapshotStatus(repository)
.setSnapshots(fullSnapshot)
.get();
assertEquals(snapshotsStatusResponse1, snapshotsStatusResponse2);

assertAcked(startDeleteSnapshot(repository, "successful-snapshot").get());

final SnapshotsStatusResponse snapshotsStatusResponse3 = clusterAdmin().prepareSnapshotStatus(repository)
.setSnapshots(fullSnapshot)
.get();
assertEquals(snapshotsStatusResponse1, snapshotsStatusResponse3);
}

private static void assertSnapshotStatusCountOnRepo(String otherBlockedRepoName, int count) {
final SnapshotsStatusResponse snapshotsStatusResponse = client().admin()
.cluster()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ private void startNewSnapshots(SnapshotsInProgress snapshotsInProgress) {
// due to CS batching we might have missed the INIT state and straight went into ABORTED
// notify master that abort has completed by moving to FAILED
if (shard.value.state() == ShardState.ABORTED && localNodeId.equals(shard.value.nodeId())) {
notifyFailedSnapshotShard(snapshot, shard.key, shard.value.reason());
notifyFailedSnapshotShard(snapshot, shard.key, shard.value.reason(), shard.value.generation());
}
} else {
snapshotStatus.abortIfNotCompleted("snapshot has been aborted");
Expand Down Expand Up @@ -284,7 +284,7 @@ public void onFailure(Exception e) {
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e);
}
snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), failure);
notifyFailedSnapshotShard(snapshot, shardId, failure);
notifyFailedSnapshotShard(snapshot, shardId, failure, snapshotStatus.generation());
}
});
}
Expand Down Expand Up @@ -441,7 +441,12 @@ private void syncShardStatsOnNewMaster(ClusterChangedEvent event) {
snapshot.snapshot(),
shardId
);
notifyFailedSnapshotShard(snapshot.snapshot(), shardId, indexShardSnapshotStatus.getFailure());
notifyFailedSnapshotShard(
snapshot.snapshot(),
shardId,
indexShardSnapshotStatus.getFailure(),
localShard.getValue().generation()
);
}
}
}
Expand All @@ -458,11 +463,11 @@ private void notifySuccessfulSnapshotShard(final Snapshot snapshot, final ShardI
}

/** Notify the master node that the given shard failed to be snapshotted **/
private void notifyFailedSnapshotShard(final Snapshot snapshot, final ShardId shardId, final String failure) {
private void notifyFailedSnapshotShard(final Snapshot snapshot, final ShardId shardId, final String failure, final String generation) {
sendSnapshotShardUpdate(
snapshot,
shardId,
new ShardSnapshotStatus(clusterService.localNode().getId(), ShardState.FAILED, failure, null)
new ShardSnapshotStatus(clusterService.localNode().getId(), ShardState.FAILED, failure, generation)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,12 @@ private void runReadyClone(
new ShardSnapshotUpdate(
target,
repoShardId,
new ShardSnapshotStatus(localNodeId, ShardState.FAILED, "failed to clone shard snapshot", null)
new ShardSnapshotStatus(
localNodeId,
ShardState.FAILED,
"failed to clone shard snapshot",
shardStatusBefore.generation()
)
),
ActionListener.runBefore(
ActionListener.wrap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,10 @@ public static void blockDataNode(String repository, String nodeName) {
AbstractSnapshotIntegTestCase.<MockRepository>getRepositoryOnNode(repository, nodeName).blockOnDataFiles();
}

public static void blockAndFailDataNode(String repository, String nodeName) {
AbstractSnapshotIntegTestCase.<MockRepository>getRepositoryOnNode(repository, nodeName).blockAndFailOnDataFiles();
}

public static void blockAllDataNodes(String repository) {
for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) {
((MockRepository) repositoriesService.repository(repository)).blockOnDataFiles();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ public long getFailureCount() {

private volatile boolean blockOnDataFiles;

private volatile boolean blockAndFailOnDataFiles;

private volatile boolean blockOnDeleteIndexN;

/**
Expand Down Expand Up @@ -214,6 +216,7 @@ public synchronized void unblock() {
blocked = false;
// Clean blocking flags, so we wouldn't try to block again
blockOnDataFiles = false;
blockAndFailOnDataFiles = false;
blockOnAnyFiles = false;
blockAndFailOnWriteIndexFile = false;
blockOnWriteIndexFile = false;
Expand All @@ -229,9 +232,15 @@ public synchronized void unblock() {
}

public void blockOnDataFiles() {
assert blockAndFailOnDataFiles == false : "Either fail or wait after data file, not both";
blockOnDataFiles = true;
}

public void blockAndFailOnDataFiles() {
assert blockOnDataFiles == false : "Either fail or wait after data file, not both";
blockAndFailOnDataFiles = true;
}

public void setBlockOnAnyFiles() {
blockOnAnyFiles = true;
}
Expand Down Expand Up @@ -300,9 +309,9 @@ private synchronized boolean blockExecution() {
logger.debug("[{}] Blocking execution", metadata.name());
boolean wasBlocked = false;
try {
while (blockOnDataFiles || blockOnAnyFiles || blockAndFailOnWriteIndexFile || blockOnWriteIndexFile ||
blockAndFailOnWriteSnapFile || blockOnDeleteIndexN || blockOnWriteShardLevelMeta || blockOnReadIndexMeta ||
blockAndFailOnReadSnapFile || blockAndFailOnReadIndexFile || blockedIndexId != null) {
while (blockAndFailOnDataFiles || blockOnDataFiles || blockOnAnyFiles || blockAndFailOnWriteIndexFile || blockOnWriteIndexFile
|| blockAndFailOnWriteSnapFile || blockOnDeleteIndexN || blockOnWriteShardLevelMeta || blockOnReadIndexMeta
|| blockAndFailOnReadSnapFile || blockAndFailOnReadIndexFile || blockedIndexId != null) {
blocked = true;
this.wait();
wasBlocked = true;
Expand Down Expand Up @@ -382,6 +391,8 @@ private void maybeIOExceptionOrBlock(String blobName) throws IOException {
}
} else if (blockOnDataFiles) {
blockExecutionAndMaybeWait(blobName);
} else if (blockAndFailOnDataFiles) {
blockExecutionAndFail(blobName);
}
} else {
if (shouldFail(blobName, randomControlIOExceptionRate) && (incrementAndGetFailureCount() < maximumNumberOfFailures)) {
Expand Down

0 comments on commit f1ba7c4

Please sign in to comment.