Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Concurrent Snapshot Repository Corruption from Operations Queued after Failing Operations #75733

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1611,6 +1611,52 @@ public void testOutOfOrderCloneFinalization() throws Exception {
);
}

public void testQueuedAfterFailedShardSnapshot() throws Exception {
internalCluster().startMasterOnlyNode();
final String dataNode = internalCluster().startDataOnlyNode();

final String repository = "test-repo";
createRepository(repository, "mock");

final String indexName = "test-idx";
createIndexWithContent(indexName);
final String fullSnapshot = "full-snapshot";
createFullSnapshot(repository, fullSnapshot);

indexDoc(indexName, "some_id", "foo", "bar");
blockAndFailDataNode(repository, dataNode);
final ActionFuture<CreateSnapshotResponse> snapshotFutureFailure = startFullSnapshot(repository, "failing-snapshot");
awaitNumberOfSnapshotsInProgress(1);
waitForBlock(dataNode, repository);
final ActionFuture<CreateSnapshotResponse> snapshotFutureSuccess = startFullSnapshot(repository, "successful-snapshot");
awaitNumberOfSnapshotsInProgress(2);
unblockNode(repository, dataNode);

assertSuccessful(snapshotFutureSuccess);
final SnapshotInfo failedSnapshot = snapshotFutureFailure.get().getSnapshotInfo();
assertEquals(SnapshotState.PARTIAL, failedSnapshot.state());

final SnapshotsStatusResponse snapshotsStatusResponse1 = clusterAdmin().prepareSnapshotStatus(repository)
.setSnapshots(fullSnapshot)
.get();

final String tmpSnapshot = "snapshot-tmp";
createFullSnapshot(repository, tmpSnapshot);
assertAcked(startDeleteSnapshot(repository, tmpSnapshot).get());

final SnapshotsStatusResponse snapshotsStatusResponse2 = clusterAdmin().prepareSnapshotStatus(repository)
.setSnapshots(fullSnapshot)
.get();
assertEquals(snapshotsStatusResponse1, snapshotsStatusResponse2);

assertAcked(startDeleteSnapshot(repository, "successful-snapshot").get());

final SnapshotsStatusResponse snapshotsStatusResponse3 = clusterAdmin().prepareSnapshotStatus(repository)
.setSnapshots(fullSnapshot)
.get();
assertEquals(snapshotsStatusResponse1, snapshotsStatusResponse3);
}

private static void assertSnapshotStatusCountOnRepo(String otherBlockedRepoName, int count) {
final SnapshotsStatusResponse snapshotsStatusResponse = client().admin()
.cluster()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ private void startNewSnapshots(SnapshotsInProgress snapshotsInProgress) {
// due to CS batching we might have missed the INIT state and straight went into ABORTED
// notify master that abort has completed by moving to FAILED
if (shard.value.state() == ShardState.ABORTED && localNodeId.equals(shard.value.nodeId())) {
notifyFailedSnapshotShard(snapshot, shard.key, shard.value.reason());
notifyFailedSnapshotShard(snapshot, shard.key, shard.value.reason(), shard.value.generation());
}
} else {
snapshotStatus.abortIfNotCompleted("snapshot has been aborted");
Expand Down Expand Up @@ -284,7 +284,7 @@ public void onFailure(Exception e) {
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e);
}
snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), failure);
notifyFailedSnapshotShard(snapshot, shardId, failure);
notifyFailedSnapshotShard(snapshot, shardId, failure, snapshotStatus.generation());
}
});
}
Expand Down Expand Up @@ -441,7 +441,12 @@ private void syncShardStatsOnNewMaster(ClusterChangedEvent event) {
snapshot.snapshot(),
shardId
);
notifyFailedSnapshotShard(snapshot.snapshot(), shardId, indexShardSnapshotStatus.getFailure());
notifyFailedSnapshotShard(
snapshot.snapshot(),
shardId,
indexShardSnapshotStatus.getFailure(),
localShard.getValue().generation()
);
}
}
}
Expand All @@ -458,11 +463,11 @@ private void notifySuccessfulSnapshotShard(final Snapshot snapshot, final ShardI
}

/** Notify the master node that the given shard failed to be snapshotted **/
private void notifyFailedSnapshotShard(final Snapshot snapshot, final ShardId shardId, final String failure) {
private void notifyFailedSnapshotShard(final Snapshot snapshot, final ShardId shardId, final String failure, final String generation) {
sendSnapshotShardUpdate(
snapshot,
shardId,
new ShardSnapshotStatus(clusterService.localNode().getId(), ShardState.FAILED, failure, null)
new ShardSnapshotStatus(clusterService.localNode().getId(), ShardState.FAILED, failure, generation)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,12 @@ private void runReadyClone(
new ShardSnapshotUpdate(
target,
repoShardId,
new ShardSnapshotStatus(localNodeId, ShardState.FAILED, "failed to clone shard snapshot", null)
new ShardSnapshotStatus(
localNodeId,
ShardState.FAILED,
"failed to clone shard snapshot",
shardStatusBefore.generation()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we shift some of the @Nullable annotations and associated assertions around in ShardSnapshotStatus? With this change I think we might still receive a null generation over the wire from an older version, but we shouldn't be creating them afresh any more?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact we ought to change the wire format so it's no longer an OptionalString. I'm ok with not doing that here, it'll make the backport that much harder, a follow up is fine.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would in fact still create them for pre-7.6 state machine operation (still a thing if there's an old snapshot in your repo) and we don't have access to the snapshot version in these constructors. In these null means (figure out the numeric generation yourself which would happen to a queued operation if e.g. the first operation for a shard in the CS fails).

Let me see what I can do about this though :)

Copy link
Member Author

@original-brownbear original-brownbear Jul 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bad news here I'm afraid. I had to remove the assertion since it didn't hold up but it's also really hard to assert this elsewhere due to the BwC situation (we can't neatly do this in ShardSnapshotStatus without refactoring its construction and doing it elsewhere is tricky as well since it's so many places right now).
If it's ok with you I think I'd rather look for a cleaner way of asserting this stuff once #75501 has landed (or actually as part of incorporating this into that change) and just assert that we're not doing any illegal changes to SnapshotsInProgress like this any longer where non-null generation becomes null generation for a given shard (much easier if we don't have to hack around translating ShardId and RepoShardId all over the place)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, thanks for looking. Blasted BwC, always spoiling the fun.

)
),
ActionListener.runBefore(
ActionListener.wrap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,10 @@ public static void blockDataNode(String repository, String nodeName) {
AbstractSnapshotIntegTestCase.<MockRepository>getRepositoryOnNode(repository, nodeName).blockOnDataFiles();
}

public static void blockAndFailDataNode(String repository, String nodeName) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact that we didn't have this logic revealed an unfortunate lack of test coverage. We have a number of tests that simulate data-node failure but they're all based on blocking the data-node via the existing block-and-wait and then shutting down the blocked data nodes which triggers a very different code path on master.

AbstractSnapshotIntegTestCase.<MockRepository>getRepositoryOnNode(repository, nodeName).blockAndFailOnDataFiles();
}

public static void blockAllDataNodes(String repository) {
for (RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) {
((MockRepository) repositoriesService.repository(repository)).blockOnDataFiles();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ public long getFailureCount() {

private volatile boolean blockOnDataFiles;

private volatile boolean blockAndFailOnDataFiles;

private volatile boolean blockOnDeleteIndexN;

/**
Expand Down Expand Up @@ -214,6 +216,7 @@ public synchronized void unblock() {
blocked = false;
// Clean blocking flags, so we wouldn't try to block again
blockOnDataFiles = false;
blockAndFailOnDataFiles = false;
blockOnAnyFiles = false;
blockAndFailOnWriteIndexFile = false;
blockOnWriteIndexFile = false;
Expand All @@ -229,9 +232,15 @@ public synchronized void unblock() {
}

public void blockOnDataFiles() {
assert blockAndFailOnDataFiles == false : "Either fail or wait after data file, not both";
blockOnDataFiles = true;
}

public void blockAndFailOnDataFiles() {
assert blockOnDataFiles == false : "Either fail or wait after data file, not both";
blockAndFailOnDataFiles = true;
}

public void setBlockOnAnyFiles() {
blockOnAnyFiles = true;
}
Expand Down Expand Up @@ -300,9 +309,9 @@ private synchronized boolean blockExecution() {
logger.debug("[{}] Blocking execution", metadata.name());
boolean wasBlocked = false;
try {
while (blockOnDataFiles || blockOnAnyFiles || blockAndFailOnWriteIndexFile || blockOnWriteIndexFile ||
blockAndFailOnWriteSnapFile || blockOnDeleteIndexN || blockOnWriteShardLevelMeta || blockOnReadIndexMeta ||
blockAndFailOnReadSnapFile || blockAndFailOnReadIndexFile || blockedIndexId != null) {
while (blockAndFailOnDataFiles || blockOnDataFiles || blockOnAnyFiles || blockAndFailOnWriteIndexFile || blockOnWriteIndexFile
|| blockAndFailOnWriteSnapFile || blockOnDeleteIndexN || blockOnWriteShardLevelMeta || blockOnReadIndexMeta
|| blockAndFailOnReadSnapFile || blockAndFailOnReadIndexFile || blockedIndexId != null) {
blocked = true;
this.wait();
wasBlocked = true;
Expand Down Expand Up @@ -382,6 +391,8 @@ private void maybeIOExceptionOrBlock(String blobName) throws IOException {
}
} else if (blockOnDataFiles) {
blockExecutionAndMaybeWait(blobName);
} else if (blockAndFailOnDataFiles) {
blockExecutionAndFail(blobName);
}
} else {
if (shouldFail(blobName, randomControlIOExceptionRate) && (incrementAndGetFailureCount() < maximumNumberOfFailures)) {
Expand Down