Skip to content

Commit

Permalink
Fix Snapshot State Machine Bug in Concurrent Clone and Delete (#75068) (
Browse files Browse the repository at this point in the history
#75285)

Unfortunately obvious bug when multiple deletes are queued up together with
a clone operation that was waiting for one of those deletes.
  • Loading branch information
original-brownbear authored Jul 13, 2021
1 parent 59ab690 commit 714ccf3
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,20 @@
import com.carrotsearch.hppc.cursors.ObjectCursor;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.common.Strings;
Expand All @@ -45,6 +48,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
Expand All @@ -58,6 +62,7 @@
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -1366,6 +1371,81 @@ public void testStartWithSuccessfulShardSnapshotPendingFinalization() throws Exc
assertSuccessful(otherSnapshot);
}

public void testConcurrentRestoreDeleteAndClone() throws Exception {
final String repository = "test-repo";
createRepository(logger, repository, "fs");

final int nbIndices = randomIntBetween(10, 20);

for (int i = 0; i < nbIndices; i++) {
final String index = "index-" + i;
createIndexWithContent(index);
final String snapshot = "snapshot-" + i;
createSnapshot(repository, snapshot, org.elasticsearch.core.List.of(index));
}

final List<ActionFuture<AcknowledgedResponse>> cloneFutures = new ArrayList<>();
final List<ActionFuture<RestoreSnapshotResponse>> restoreFutures = new ArrayList<>();

for (int i = 0; i < nbIndices; i++) {
if (randomBoolean()) {
restoreFutures.add(
client().admin()
.cluster()
.prepareRestoreSnapshot(repository, "snapshot-" + i)
.setIndices("index-" + i)
.setRenamePattern("(.+)")
.setRenameReplacement("$1-restored-" + i)
.setWaitForCompletion(true)
.execute()
);
} else {
cloneFutures.add(
client().admin()
.cluster()
.prepareCloneSnapshot(repository, "snapshot-" + i, "clone-" + i)
.setIndices("index-" + i)
.execute()
);
}
}

// make deletes and clones complete concurrently
if (cloneFutures.isEmpty() == false) {
awaitNumberOfSnapshotsInProgress(cloneFutures.size());
}
if (restoreFutures.isEmpty() == false) {
awaitClusterState(state -> state.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY).isEmpty() == false);
}
final List<ActionFuture<AcknowledgedResponse>> deleteFutures = new ArrayList<>(nbIndices);
for (int i = 0; i < nbIndices; i++) {
deleteFutures.add(startDeleteSnapshot(repository, "snapshot-" + i));
}

for (ActionFuture<RestoreSnapshotResponse> operation : restoreFutures) {
try {
final RestoreInfo restoreResponse = operation.get().getRestoreInfo();
assertThat(restoreResponse.successfulShards(), greaterThanOrEqualTo(1));
assertEquals(0, restoreResponse.failedShards());
} catch (ExecutionException e) {
final Throwable csee = ExceptionsHelper.unwrap(e, ConcurrentSnapshotExecutionException.class);
assertThat(csee, instanceOf(ConcurrentSnapshotExecutionException.class));
}
}
for (ActionFuture<AcknowledgedResponse> operation : cloneFutures) {
assertAcked(operation.get());
}
for (ActionFuture<AcknowledgedResponse> operation : deleteFutures) {
try {
assertAcked(operation.get());
} catch (ExecutionException e) {
final Throwable csee = ExceptionsHelper.unwrap(e, ConcurrentSnapshotExecutionException.class);
assertThat(csee, instanceOf(ConcurrentSnapshotExecutionException.class));
}
}
awaitNoMoreRunningOperations();
}

private static void assertSnapshotStatusCountOnRepo(String otherBlockedRepoName, int count) {
final SnapshotsStatusResponse snapshotsStatusResponse = client().admin()
.cluster()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.endArray();
builder.timeField("start_time_millis", "start_time", entry.startTime);
builder.field("repository_state_id", entry.repositoryStateId);
builder.field("state", entry.state);
}
builder.endObject();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1495,7 +1495,11 @@ private static boolean assertNoDanglingSnapshots(ClusterState state) {
: "Found shard snapshot waiting to be assigned in [" + entry + "] but it is not blocked by any running delete";
} else if (value.value.isActive()) {
assert reposWithRunningDelete.contains(entry.repository()) == false
: "Found shard snapshot actively executing in [" + entry + "] when it should be blocked by a running delete";
: "Found shard snapshot actively executing in ["
+ entry
+ "] when it should be blocked by a running delete ["
+ Strings.toString(snapshotDeletionsInProgress)
+ "]";
}
}
}
Expand Down Expand Up @@ -2772,7 +2776,7 @@ private static boolean isWritingToRepository(SnapshotsInProgress.Entry entry) {
// Entry is writing to the repo because it's finalizing on master
return true;
}
for (ObjectCursor<ShardSnapshotStatus> value : entry.shards().values()) {
for (ObjectCursor<ShardSnapshotStatus> value : (entry.isClone() ? entry.clones() : entry.shards()).values()) {
if (value.value.isActive()) {
// Entry is writing to the repo because it's writing to a shard on a data node or waiting to do so for a concrete shard
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public void testXContent() throws IOException {
String json = Strings.toString(builder);
assertThat(json,
equalTo("{\"snapshot_deletions\":[{\"repository\":\"repo\",\"snapshots\":[]," +
"\"start_time\":\"1993-05-06T13:17:47.638Z\",\"start_time_millis\":736694267638,\"repository_state_id\":0}]}"));
"\"start_time\":\"1993-05-06T13:17:47.638Z\",\"start_time_millis\":736694267638,\"repository_state_id\":0," +
"\"state\":\"STARTED\"}]}"));
}
}
}

0 comments on commit 714ccf3

Please sign in to comment.