Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid concurrent snapshot finalizations when deleting an INIT snapshot #28078

Merged
merged 2 commits into from
Jan 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -372,26 +372,32 @@ private void beginSnapshot(final ClusterState clusterState,
return;
}
clusterService.submitStateUpdateTask("update_snapshot [" + snapshot.snapshot() + "]", new ClusterStateUpdateTask() {
boolean accepted = false;
SnapshotsInProgress.Entry updatedSnapshot;

SnapshotsInProgress.Entry endSnapshot;
String failure = null;

@Override
public ClusterState execute(ClusterState currentState) {
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
List<SnapshotsInProgress.Entry> entries = new ArrayList<>();
for (SnapshotsInProgress.Entry entry : snapshots.entries()) {
if (entry.snapshot().equals(snapshot.snapshot()) && entry.state() != State.ABORTED) {
// Replace the snapshot that was just created
if (entry.snapshot().equals(snapshot.snapshot()) == false) {
entries.add(entry);
continue;
}

if (entry.state() != State.ABORTED) {
// Replace the snapshot that was just intialized
ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards = shards(currentState, entry.indices());
if (!partial) {
Tuple<Set<String>, Set<String>> indicesWithMissingShards = indicesWithMissingShards(shards, currentState.metaData());
Set<String> missing = indicesWithMissingShards.v1();
Set<String> closed = indicesWithMissingShards.v2();
if (missing.isEmpty() == false || closed.isEmpty() == false) {
StringBuilder failureMessage = new StringBuilder();
updatedSnapshot = new SnapshotsInProgress.Entry(entry, State.FAILED, shards);
entries.add(updatedSnapshot);
endSnapshot = new SnapshotsInProgress.Entry(entry, State.FAILED, shards);
entries.add(endSnapshot);

final StringBuilder failureMessage = new StringBuilder();
if (missing.isEmpty() == false) {
failureMessage.append("Indices don't have primary shards ");
failureMessage.append(missing);
Expand All @@ -407,13 +413,16 @@ public ClusterState execute(ClusterState currentState) {
continue;
}
}
updatedSnapshot = new SnapshotsInProgress.Entry(entry, State.STARTED, shards);
SnapshotsInProgress.Entry updatedSnapshot = new SnapshotsInProgress.Entry(entry, State.STARTED, shards);
entries.add(updatedSnapshot);
if (!completed(shards.values())) {
accepted = true;
if (completed(shards.values())) {
endSnapshot = updatedSnapshot;
}
} else {
entries.add(entry);
assert entry.state() == State.ABORTED : "expecting snapshot to be aborted during initialization";
failure = "snapshot was aborted during initialization";
endSnapshot = entry;
entries.add(endSnapshot);
}
}
return ClusterState.builder(currentState)
Expand Down Expand Up @@ -448,8 +457,8 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
// We should end snapshot only if 1) we didn't accept it for processing (which happens when there
// is nothing to do) and 2) there was a snapshot in metadata that we should end. Otherwise we should
// go ahead and continue working on this snapshot rather then end here.
if (!accepted && updatedSnapshot != null) {
endSnapshot(updatedSnapshot, failure);
if (endSnapshot != null) {
endSnapshot(endSnapshot, failure);
}
}
});
Expand Down Expand Up @@ -750,6 +759,11 @@ public ClusterState execute(ClusterState currentState) throws Exception {
}
entries.add(updatedSnapshot);
} else if (snapshot.state() == State.INIT && newMaster) {
changed = true;
// Mark the snapshot as aborted as it failed to start from the previous master
updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, State.ABORTED, snapshot.shards());
entries.add(updatedSnapshot);

// Clean up the snapshot that failed to start from the old master
deleteSnapshot(snapshot.snapshot(), new DeleteSnapshotListener() {
@Override
Expand Down Expand Up @@ -935,7 +949,7 @@ private Tuple<Set<String>, Set<String>> indicesWithMissingShards(ImmutableOpenMa
*
* @param entry snapshot
*/
void endSnapshot(SnapshotsInProgress.Entry entry) {
void endSnapshot(final SnapshotsInProgress.Entry entry) {
endSnapshot(entry, null);
}

Expand Down Expand Up @@ -1144,24 +1158,26 @@ public ClusterState execute(ClusterState currentState) throws Exception {
} else {
// This snapshot is currently running - stopping shards first
waitForSnapshot = true;
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards;
if (snapshotEntry.state() == State.STARTED && snapshotEntry.shards() != null) {
// snapshot is currently running - stop started shards
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shardsBuilder = ImmutableOpenMap.builder();

final ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards;

final State state = snapshotEntry.state();
if (state == State.INIT) {
// snapshot is still initializing, mark it as aborted
shards = snapshotEntry.shards();

} else if (state == State.STARTED) {
// snapshot is started - mark every non completed shard as aborted
final ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shardsBuilder = ImmutableOpenMap.builder();
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardEntry : snapshotEntry.shards()) {
ShardSnapshotStatus status = shardEntry.value;
if (!status.state().completed()) {
shardsBuilder.put(shardEntry.key, new ShardSnapshotStatus(status.nodeId(), State.ABORTED,
"aborted by snapshot deletion"));
} else {
shardsBuilder.put(shardEntry.key, status);
if (status.state().completed() == false) {
status = new ShardSnapshotStatus(status.nodeId(), State.ABORTED, "aborted by snapshot deletion");
}
shardsBuilder.put(shardEntry.key, status);
}
shards = shardsBuilder.build();
} else if (snapshotEntry.state() == State.INIT) {
// snapshot hasn't started yet - end it
shards = snapshotEntry.shards();
endSnapshot(snapshotEntry);

} else {
boolean hasUncompletedShards = false;
// Cleanup in case a node gone missing and snapshot wasn't updated for some reason
Expand All @@ -1178,7 +1194,8 @@ public ClusterState execute(ClusterState currentState) throws Exception {
logger.debug("trying to delete completed snapshot - should wait for shards to finalize on all nodes");
return currentState;
} else {
// no shards to wait for - finish the snapshot
// no shards to wait for but a node is gone - this is the only case
// where we force to finish the snapshot
logger.debug("trying to delete completed snapshot with no finalizing shards - can delete immediately");
shards = snapshotEntry.shards();
endSnapshot(snapshotEntry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3151,7 +3151,7 @@ public void testSnapshottingWithMissingSequenceNumbers() {
assertThat(shardStats.getSeqNoStats().getMaxSeqNo(), equalTo(15L));
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/27974")
@TestLogging("org.elasticsearch.snapshots:TRACE")
public void testAbortedSnapshotDuringInitDoesNotStart() throws Exception {
final Client client = client();

Expand All @@ -3163,11 +3163,11 @@ public void testAbortedSnapshotDuringInitDoesNotStart() throws Exception {
));

createIndex("test-idx");
final int nbDocs = scaledRandomIntBetween(1, 100);
final int nbDocs = scaledRandomIntBetween(100, 500);
for (int i = 0; i < nbDocs; i++) {
index("test-idx", "_doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();
flushAndRefresh("test-idx");
assertThat(client.prepareSearch("test-idx").setSize(0).get().getHits().getTotalHits(), equalTo((long) nbDocs));

// Create a snapshot
Expand Down