From 22679c793211479d07c61924684a5750fde9e50d Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Sat, 5 Oct 2019 15:01:06 +0200 Subject: [PATCH] Fix Snapshot Corruption in Edge Case (#47552) (#47620) This fixes missing to marking shard snapshots as failures when multiple data-nodes are lost during the snapshot process or shard snapshot failures have occured before a node left the cluster. The problem was that we were simply not adding any shard entries for completed shards on node-left events. This has no effect for a successful shard, but for a failed shard would lead to that shard not being marked as failed during snapshot finalization. Fixed by corectly keeping track of all previous completed shard states as well in this case. Also, added an assertion that without this fix would trip on almost every run of the resiliency tests and adjusted the serialization of SnapshotsInProgress.Entry so we have a proper assertion message. Closes #47550 --- .../cluster/SnapshotsInProgress.java | 86 ++++++++++--------- .../snapshots/SnapshotsService.java | 13 ++- 2 files changed, 56 insertions(+), 43 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index a4e585c2f0528..6ae6a8109ddd0 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -25,6 +25,7 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState.Custom; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -76,7 +77,7 @@ public String toString() { return builder.append("]").toString(); } - public static class Entry { + public static class Entry implements ToXContent { private final State state; private final Snapshot snapshot; private final boolean includeGlobalState; @@ -210,7 +211,50 @@ public int hashCode() { @Override public String toString() { - return snapshot.toString(); + return Strings.toString(this); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(REPOSITORY, snapshot.getRepository()); + builder.field(SNAPSHOT, snapshot.getSnapshotId().getName()); + builder.field(UUID, snapshot.getSnapshotId().getUUID()); + builder.field(INCLUDE_GLOBAL_STATE, includeGlobalState()); + builder.field(PARTIAL, partial); + builder.field(STATE, state); + builder.startArray(INDICES); + { + for (IndexId index : indices) { + index.toXContent(builder, params); + } + } + builder.endArray(); + builder.humanReadableField(START_TIME_MILLIS, START_TIME, new TimeValue(startTime)); + builder.field(REPOSITORY_STATE_ID, repositoryStateId); + builder.startArray(SHARDS); + { + for (ObjectObjectCursor shardEntry : shards) { + ShardId shardId = shardEntry.key; + ShardSnapshotStatus status = shardEntry.value; + builder.startObject(); + { + builder.field(INDEX, shardId.getIndex()); + builder.field(SHARD, shardId.getId()); + builder.field(STATE, status.state()); + builder.field(NODE, status.nodeId()); + } + builder.endObject(); + } + } + builder.endArray(); + builder.endObject(); + return builder; + } + + @Override + public boolean isFragment() { + return false; } private ImmutableOpenMap> findWaitingIndices(ImmutableOpenMap shards) { @@ -507,48 +551,12 @@ public void writeTo(StreamOutput out) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { builder.startArray(SNAPSHOTS); for (Entry entry : entries) { - toXContent(entry, builder, params); + entry.toXContent(builder, params); } builder.endArray(); return builder; } - public void toXContent(Entry entry, XContentBuilder builder, ToXContent.Params params) throws IOException { - builder.startObject(); - builder.field(REPOSITORY, entry.snapshot().getRepository()); - builder.field(SNAPSHOT, entry.snapshot().getSnapshotId().getName()); - builder.field(UUID, entry.snapshot().getSnapshotId().getUUID()); - builder.field(INCLUDE_GLOBAL_STATE, entry.includeGlobalState()); - builder.field(PARTIAL, entry.partial()); - builder.field(STATE, entry.state()); - builder.startArray(INDICES); - { - for (IndexId index : entry.indices()) { - index.toXContent(builder, params); - } - } - builder.endArray(); - builder.humanReadableField(START_TIME_MILLIS, START_TIME, new TimeValue(entry.startTime())); - builder.field(REPOSITORY_STATE_ID, entry.getRepositoryStateId()); - builder.startArray(SHARDS); - { - for (ObjectObjectCursor shardEntry : entry.shards) { - ShardId shardId = shardEntry.key; - ShardSnapshotStatus status = shardEntry.value; - builder.startObject(); - { - builder.field(INDEX, shardId.getIndex()); - builder.field(SHARD, shardId.getId()); - builder.field(STATE, status.state()); - builder.field(NODE, status.nodeId()); - } - builder.endObject(); - } - } - builder.endArray(); - builder.endObject(); - } - public enum ShardState { INIT((byte) 0, false, false), SUCCESS((byte) 2, true, false), diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index b1823a9b3117a..717efda1b7998 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -792,18 +792,21 @@ public ClusterState execute(ClusterState currentState) { ImmutableOpenMap.Builder shards = ImmutableOpenMap.builder(); boolean snapshotChanged = false; for (ObjectObjectCursor shardEntry : snapshot.shards()) { - ShardSnapshotStatus shardStatus = shardEntry.value; + final ShardSnapshotStatus shardStatus = shardEntry.value; + final ShardId shardId = shardEntry.key; if (!shardStatus.state().completed() && shardStatus.nodeId() != null) { if (nodes.nodeExists(shardStatus.nodeId())) { - shards.put(shardEntry.key, shardEntry.value); + shards.put(shardId, shardStatus); } else { // TODO: Restart snapshot on another node? snapshotChanged = true; logger.warn("failing snapshot of shard [{}] on closed node [{}]", - shardEntry.key, shardStatus.nodeId()); - shards.put(shardEntry.key, + shardId, shardStatus.nodeId()); + shards.put(shardId, new ShardSnapshotStatus(shardStatus.nodeId(), ShardState.FAILED, "node shutdown")); } + } else { + shards.put(shardId, shardStatus); } } if (snapshotChanged) { @@ -835,6 +838,8 @@ public void onFailure(Exception e) { } }, updatedSnapshot.getRepositoryStateId(), false); } + assert updatedSnapshot.shards().size() == snapshot.shards().size() + : "Shard count changed during snapshot status update from [" + snapshot + "] to [" + updatedSnapshot + "]"; } if (changed) { return ClusterState.builder(currentState)