Skip to content

Commit

Permalink
Make Snapshot Delete Concurrency Exception Consistent (#49266)
Browse files Browse the repository at this point in the history
We shouldn't be throwing `RepositoryException` when the repository
wasn't concurrently modified in an unexpected fashion (i.e. on the blob/file level).
When we know that the known repo gen moved higher in terms of the generation
tracked in master memory we should throw the concurrent snapshot exception.

This change makes concurrent snapshot create and delete always throw the same exception,
prevents unnecessary listings when the generation is known to be off and prevents
future test failures in SLM tests that assume the concurrent snapshot exception
is always thrown here.
Without this change, the newly added test randomly fails the `instanceOf` assertion
by running into a `RepositoryException`.
  • Loading branch information
original-brownbear authored Nov 19, 2019
1 parent 68870ac commit 6685581
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryVerificationException;
import org.elasticsearch.repositories.ShardGenerations;
import org.elasticsearch.snapshots.ConcurrentSnapshotExecutionException;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotException;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
Expand Down Expand Up @@ -365,6 +367,13 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolea
if (isReadOnly()) {
listener.onFailure(new RepositoryException(metadata.name(), "cannot delete snapshot from a readonly repository"));
} else {
final long latestKnownGen = latestKnownRepoGen.get();
if (latestKnownGen > repositoryStateId) {
listener.onFailure(new ConcurrentSnapshotExecutionException(
new Snapshot(metadata.name(), snapshotId), "Another concurrent operation moved repo generation to [ " + latestKnownGen
+ "] but this delete assumed generation [" + repositoryStateId + "]"));
return;
}
try {
final Map<String, BlobMetaData> rootBlobs = blobContainer().listBlobs();
final RepositoryData repositoryData = safeRepositoryData(repositoryStateId, rootBlobs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.mockito.Mockito.mock;

Expand Down Expand Up @@ -417,6 +418,74 @@ public void testConcurrentSnapshotCreateAndDelete() {
assertEquals(0, snapshotInfo.failedShards());
}

public void testConcurrentSnapshotCreateAndDeleteOther() {
setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10));

String repoName = "repo";
String snapshotName = "snapshot";
final String index = "test";
final int shards = randomIntBetween(1, 10);

TestClusterNodes.TestClusterNode masterNode =
testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state());

final StepListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new StepListener<>();

continueOrDie(createRepoAndIndex(masterNode, repoName, index, shards),
createIndexResponse -> masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
.setWaitForCompletion(true).execute(createSnapshotResponseStepListener));

final StepListener<CreateSnapshotResponse> createOtherSnapshotResponseStepListener = new StepListener<>();

continueOrDie(createSnapshotResponseStepListener,
createSnapshotResponse -> masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, "snapshot-2")
.execute(createOtherSnapshotResponseStepListener));

final StepListener<Boolean> deleteSnapshotStepListener = new StepListener<>();

continueOrDie(createOtherSnapshotResponseStepListener,
createSnapshotResponse -> masterNode.client.admin().cluster().deleteSnapshot(
new DeleteSnapshotRequest(repoName, snapshotName), ActionListener.wrap(
resp -> deleteSnapshotStepListener.onResponse(true),
e -> {
assertThat(e, instanceOf(ConcurrentSnapshotExecutionException.class));
deleteSnapshotStepListener.onResponse(false);
})));

final StepListener<CreateSnapshotResponse> createAnotherSnapshotResponseStepListener = new StepListener<>();

continueOrDie(deleteSnapshotStepListener, deleted -> {
if (deleted) {
// The delete worked out, creating a third snapshot
masterNode.client.admin().cluster()
.prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(true)
.execute(createAnotherSnapshotResponseStepListener);
continueOrDie(createAnotherSnapshotResponseStepListener, createSnapshotResponse ->
assertEquals(createSnapshotResponse.getSnapshotInfo().state(), SnapshotState.SUCCESS));
} else {
createAnotherSnapshotResponseStepListener.onResponse(null);
}
});

deterministicTaskQueue.runAllRunnableTasks();

final CreateSnapshotResponse thirdSnapshotResponse = createAnotherSnapshotResponseStepListener.result();

SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE);
assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false));
final Repository repository = masterNode.repositoriesService.repository(repoName);
Collection<SnapshotId> snapshotIds = repository.getRepositoryData().getSnapshotIds();
assertThat(snapshotIds, hasSize(thirdSnapshotResponse == null ? 2 : 3));

for (SnapshotId snapshotId : snapshotIds) {
final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotId);
assertEquals(SnapshotState.SUCCESS, snapshotInfo.state());
assertThat(snapshotInfo.indices(), containsInAnyOrder(index));
assertEquals(shards, snapshotInfo.successfulShards());
assertEquals(0, snapshotInfo.failedShards());
}
}

/**
* Simulates concurrent restarts of data and master nodes as well as relocating a primary shard, while starting and subsequently
* deleting a snapshot.
Expand Down

0 comments on commit 6685581

Please sign in to comment.