diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index a4e585c2f0528..6ae6a8109ddd0 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -25,6 +25,7 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState.Custom; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -76,7 +77,7 @@ public String toString() { return builder.append("]").toString(); } - public static class Entry { + public static class Entry implements ToXContent { private final State state; private final Snapshot snapshot; private final boolean includeGlobalState; @@ -210,7 +211,50 @@ public int hashCode() { @Override public String toString() { - return snapshot.toString(); + return Strings.toString(this); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(REPOSITORY, snapshot.getRepository()); + builder.field(SNAPSHOT, snapshot.getSnapshotId().getName()); + builder.field(UUID, snapshot.getSnapshotId().getUUID()); + builder.field(INCLUDE_GLOBAL_STATE, includeGlobalState()); + builder.field(PARTIAL, partial); + builder.field(STATE, state); + builder.startArray(INDICES); + { + for (IndexId index : indices) { + index.toXContent(builder, params); + } + } + builder.endArray(); + builder.humanReadableField(START_TIME_MILLIS, START_TIME, new TimeValue(startTime)); + builder.field(REPOSITORY_STATE_ID, repositoryStateId); + builder.startArray(SHARDS); + { + for (ObjectObjectCursor shardEntry : shards) { + ShardId shardId = shardEntry.key; + ShardSnapshotStatus status = shardEntry.value; + builder.startObject(); + { + builder.field(INDEX, shardId.getIndex()); + builder.field(SHARD, shardId.getId()); + builder.field(STATE, status.state()); + builder.field(NODE, status.nodeId()); + } + builder.endObject(); + } + } + builder.endArray(); + builder.endObject(); + return builder; + } + + @Override + public boolean isFragment() { + return false; } private ImmutableOpenMap> findWaitingIndices(ImmutableOpenMap shards) { @@ -507,48 +551,12 @@ public void writeTo(StreamOutput out) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { builder.startArray(SNAPSHOTS); for (Entry entry : entries) { - toXContent(entry, builder, params); + entry.toXContent(builder, params); } builder.endArray(); return builder; } - public void toXContent(Entry entry, XContentBuilder builder, ToXContent.Params params) throws IOException { - builder.startObject(); - builder.field(REPOSITORY, entry.snapshot().getRepository()); - builder.field(SNAPSHOT, entry.snapshot().getSnapshotId().getName()); - builder.field(UUID, entry.snapshot().getSnapshotId().getUUID()); - builder.field(INCLUDE_GLOBAL_STATE, entry.includeGlobalState()); - builder.field(PARTIAL, entry.partial()); - builder.field(STATE, entry.state()); - builder.startArray(INDICES); - { - for (IndexId index : entry.indices()) { - index.toXContent(builder, params); - } - } - builder.endArray(); - builder.humanReadableField(START_TIME_MILLIS, START_TIME, new TimeValue(entry.startTime())); - builder.field(REPOSITORY_STATE_ID, entry.getRepositoryStateId()); - builder.startArray(SHARDS); - { - for (ObjectObjectCursor shardEntry : entry.shards) { - ShardId shardId = shardEntry.key; - ShardSnapshotStatus status = shardEntry.value; - builder.startObject(); - { - builder.field(INDEX, shardId.getIndex()); - builder.field(SHARD, shardId.getId()); - builder.field(STATE, status.state()); - builder.field(NODE, status.nodeId()); - } - builder.endObject(); - } - } - builder.endArray(); - builder.endObject(); - } - public enum ShardState { INIT((byte) 0, false, false), SUCCESS((byte) 2, true, false), diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index e9b7915cf98ed..d038262fd4d2d 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -764,18 +764,21 @@ public ClusterState execute(ClusterState currentState) { ImmutableOpenMap.Builder shards = ImmutableOpenMap.builder(); boolean snapshotChanged = false; for (ObjectObjectCursor shardEntry : snapshot.shards()) { - ShardSnapshotStatus shardStatus = shardEntry.value; + final ShardSnapshotStatus shardStatus = shardEntry.value; + final ShardId shardId = shardEntry.key; if (!shardStatus.state().completed() && shardStatus.nodeId() != null) { if (nodes.nodeExists(shardStatus.nodeId())) { - shards.put(shardEntry.key, shardEntry.value); + shards.put(shardId, shardStatus); } else { // TODO: Restart snapshot on another node? snapshotChanged = true; logger.warn("failing snapshot of shard [{}] on closed node [{}]", - shardEntry.key, shardStatus.nodeId()); - shards.put(shardEntry.key, + shardId, shardStatus.nodeId()); + shards.put(shardId, new ShardSnapshotStatus(shardStatus.nodeId(), ShardState.FAILED, "node shutdown")); } + } else { + shards.put(shardId, shardStatus); } } if (snapshotChanged) { @@ -807,6 +810,8 @@ public void onFailure(Exception e) { } }, updatedSnapshot.getRepositoryStateId(), false); } + assert updatedSnapshot.shards().size() == snapshot.shards().size() + : "Shard count changed during snapshot status update from [" + snapshot + "] to [" + updatedSnapshot + "]"; } if (changed) { return ClusterState.builder(currentState)