diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsInProgress.java index f2a62d82995b7..95b9f06e12e5e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsInProgress.java @@ -68,14 +68,6 @@ private static boolean assertNoConcurrentDeletionsForSameRepository(List return true; } - /** - * Returns a new instance of {@link SnapshotDeletionsInProgress} with the given - * {@link Entry} added. - */ - public static SnapshotDeletionsInProgress newInstance(Entry entry) { - return new SnapshotDeletionsInProgress(Collections.singletonList(entry)); - } - /** * Returns a new instance of {@link SnapshotDeletionsInProgress} which adds * the given {@link Entry} to the invoking instance. diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index cfb74f0603b0b..d7453929467d6 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -55,6 +55,67 @@ public class SnapshotsInProgress extends AbstractNamedDiffable implement public static final String ABORTED_FAILURE_TEXT = "Snapshot was aborted by deletion"; + private final List entries; + + public static SnapshotsInProgress of(List entries) { + if (entries.isEmpty()) { + return EMPTY; + } + return new SnapshotsInProgress(Collections.unmodifiableList(entries)); + } + + public SnapshotsInProgress(StreamInput in) throws IOException { + this(in.readList(SnapshotsInProgress.Entry::new)); + } + + private SnapshotsInProgress(List entries) { + this.entries = entries; + assert assertConsistentEntries(entries); + } + + public List entries() { + return this.entries; + } + + public Entry snapshot(final Snapshot snapshot) { + for (Entry entry : entries) { + final Snapshot curr = entry.snapshot(); + if (curr.equals(snapshot)) { + return entry; + } + } + return null; + } + + @Override + public String getWriteableName() { + return TYPE; + } + + @Override + public Version getMinimalSupportedVersion() { + return Version.CURRENT.minimumCompatibilityVersion(); + } + + public static NamedDiff readDiffFrom(StreamInput in) throws IOException { + return readDiffFrom(Custom.class, TYPE, in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeList(entries); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + builder.startArray("snapshots"); + for (Entry entry : entries) { + entry.toXContent(builder, params); + } + builder.endArray(); + return builder; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -84,8 +145,9 @@ public String toString() { * will be in state {@link State#SUCCESS} right away otherwise it will be in state {@link State#STARTED}. */ public static Entry startedEntry(Snapshot snapshot, boolean includeGlobalState, boolean partial, List indices, - List dataStreams, long startTime, long repositoryStateId, ImmutableOpenMap shards, - Map userMetadata, Version version, List featureStates) { + List dataStreams, long startTime, long repositoryStateId, + ImmutableOpenMap shards, + Map userMetadata, Version version, List featureStates) { return new SnapshotsInProgress.Entry(snapshot, includeGlobalState, partial, completed(shards.values()) ? State.SUCCESS : State.STARTED, indices, dataStreams, featureStates, startTime, repositoryStateId, shards, null, userMetadata, version); @@ -105,795 +167,707 @@ public static Entry startedEntry(Snapshot snapshot, boolean includeGlobalState, public static Entry startClone(Snapshot snapshot, SnapshotId source, List indices, long startTime, long repositoryStateId, Version version) { return new SnapshotsInProgress.Entry(snapshot, true, false, State.STARTED, indices, Collections.emptyList(), - Collections.emptyList(), startTime, repositoryStateId, ImmutableOpenMap.of(), null, Collections.emptyMap(), version, source, - ImmutableOpenMap.of()); + Collections.emptyList(), startTime, repositoryStateId, ImmutableOpenMap.of(), null, Collections.emptyMap(), version, + source, ImmutableOpenMap.of()); } - public static class Entry implements Writeable, ToXContent, RepositoryOperation { - private final State state; - private final Snapshot snapshot; - private final boolean includeGlobalState; - private final boolean partial; - /** - * Map of {@link ShardId} to {@link ShardSnapshotStatus} tracking the state of each shard snapshot operation. - */ - private final ImmutableOpenMap shards; - private final List indices; - private final List dataStreams; - private final List featureStates; - private final long startTime; - private final long repositoryStateId; - // see #useShardGenerations - private final Version version; - - /** - * Source snapshot if this is a clone operation or {@code null} if this is a snapshot. - */ - @Nullable - private final SnapshotId source; - - /** - * Map of {@link RepositoryShardId} to {@link ShardSnapshotStatus} tracking the state of each shard clone operation in this entry - * the same way {@link #shards} tracks the status of each shard snapshot operation in non-clone entries. - */ - private final ImmutableOpenMap clones; - - @Nullable private final Map userMetadata; - @Nullable private final String failure; - - // visible for testing, use #startedEntry and copy constructors in production code - public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List indices, - List dataStreams, List featureStates, long startTime, long repositoryStateId, - ImmutableOpenMap shards, String failure, Map userMetadata, - Version version) { - this(snapshot, includeGlobalState, partial, state, indices, dataStreams, featureStates, startTime, repositoryStateId, shards, - failure, userMetadata, version, null, ImmutableOpenMap.of()); - } - - private Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List indices, - List dataStreams, List featureStates, long startTime, long repositoryStateId, - ImmutableOpenMap shards, String failure, Map userMetadata, - Version version, @Nullable SnapshotId source, - @Nullable ImmutableOpenMap clones) { - this.state = state; - this.snapshot = snapshot; - this.includeGlobalState = includeGlobalState; - this.partial = partial; - this.indices = indices; - this.dataStreams = dataStreams; - this.featureStates = Collections.unmodifiableList(featureStates); - this.startTime = startTime; - this.shards = shards; - this.repositoryStateId = repositoryStateId; - this.failure = failure; - this.userMetadata = userMetadata; - this.version = version; - this.source = source; - if (source == null) { - assert clones == null || clones.isEmpty() : "Provided [" + clones + "] but no source"; - this.clones = ImmutableOpenMap.of(); - } else { - this.clones = clones; + /** + * Checks if all shards in the list have completed + * + * @param shards list of shard statuses + * @return true if all shards have completed (either successfully or failed), false otherwise + */ + public static boolean completed(ObjectContainer shards) { + for (ObjectCursor status : shards) { + if (status.value.state().completed == false) { + return false; } - assert assertShardsConsistent(this.source, this.state, this.indices, this.shards, this.clones); } + return true; + } - private Entry(StreamInput in) throws IOException { - snapshot = new Snapshot(in); - includeGlobalState = in.readBoolean(); - partial = in.readBoolean(); - state = State.fromValue(in.readByte()); - indices = in.readList(IndexId::new); - startTime = in.readLong(); - shards = in.readImmutableMap(ShardId::new, ShardSnapshotStatus::readFrom); - repositoryStateId = in.readLong(); - failure = in.readOptionalString(); - userMetadata = in.readMap(); - version = Version.readVersion(in); - dataStreams = in.readStringList(); - if (in.getVersion().onOrAfter(SnapshotsService.CLONE_SNAPSHOT_VERSION)) { - source = in.readOptionalWriteable(SnapshotId::new); - clones = in.readImmutableMap(RepositoryShardId::new, ShardSnapshotStatus::readFrom); - } else { - source = null; - clones = ImmutableOpenMap.of(); - } - if (in.getVersion().onOrAfter(FEATURE_STATES_VERSION)) { - featureStates = Collections.unmodifiableList(in.readList(SnapshotFeatureInfo::new)); - } else { - featureStates = Collections.emptyList(); + private static boolean hasFailures(ImmutableOpenMap clones) { + for (ObjectCursor value : clones.values()) { + if (value.value.state().failed()) { + return true; } } + return false; + } - private static boolean assertShardsConsistent(SnapshotId source, State state, List indices, - ImmutableOpenMap shards, - ImmutableOpenMap clones) { - if ((state == State.INIT || state == State.ABORTED) && shards.isEmpty()) { - return true; - } - final Set indexNames = indices.stream().map(IndexId::getName).collect(Collectors.toSet()); - final Set indexNamesInShards = new HashSet<>(); - shards.iterator().forEachRemaining(s -> { - indexNamesInShards.add(s.key.getIndexName()); - assert source == null || s.value.nodeId == null : - "Shard snapshot must not be assigned to data node when copying from snapshot [" + source + "]"; - }); - assert source == null || indexNames.isEmpty() == false : "No empty snapshot clones allowed"; - assert source != null || indexNames.equals(indexNamesInShards) - : "Indices in shards " + indexNamesInShards + " differ from expected indices " + indexNames + " for state [" + state + "]"; - final boolean shardsCompleted = completed(shards.values()) && completed(clones.values()); - // Check state consistency for normal snapshots and started clone operations - if (source == null || clones.isEmpty() == false) { - assert (state.completed() && shardsCompleted) || (state.completed() == false && shardsCompleted == false) - : "Completed state must imply all shards completed but saw state [" + state + "] and shards " + shards; + private static boolean assertConsistentEntries(List entries) { + final Map>> assignedShardsByRepo = new HashMap<>(); + final Map>> queuedShardsByRepo = new HashMap<>(); + for (Entry entry : entries) { + for (ObjectObjectCursor shard : entry.shards()) { + final ShardId sid = shard.key; + assert assertShardStateConsistent(entries, assignedShardsByRepo, queuedShardsByRepo, entry, sid.getIndexName(), sid.id(), + shard.value); } - if (source != null && state.completed()) { - assert hasFailures(clones) == false || state == State.FAILED - : "Failed shard clones in [" + clones + "] but state was [" + state + "]"; + for (ObjectObjectCursor shard : entry.clones()) { + final RepositoryShardId sid = shard.key; + assert assertShardStateConsistent(entries, assignedShardsByRepo, queuedShardsByRepo, entry, sid.indexName(), sid.shardId(), + shard.value); } - return true; } - - public Entry withRepoGen(long newRepoGen) { - assert newRepoGen > repositoryStateId : "Updated repository generation [" + newRepoGen - + "] must be higher than current generation [" + repositoryStateId + "]"; - return new Entry(snapshot, includeGlobalState, partial, state, indices, dataStreams, featureStates, startTime, newRepoGen, - shards, failure, userMetadata, version, source, clones); + for (String repoName : assignedShardsByRepo.keySet()) { + // make sure in-flight-shard-states can be built cleanly for the entries without tripping assertions + InFlightShardSnapshotStates.forRepo(repoName, entries); } + return true; + } - public Entry withClones(ImmutableOpenMap updatedClones) { - if (updatedClones.equals(clones)) { - return this; - } - return new Entry(snapshot, includeGlobalState, partial, - completed(updatedClones.values()) ? (hasFailures(updatedClones) ? State.FAILED : State.SUCCESS) : - state, indices, dataStreams, featureStates, startTime, repositoryStateId, shards, failure, userMetadata, - version, source, updatedClones); + private static boolean assertShardStateConsistent(List entries, Map>> assignedShardsByRepo, + Map>> queuedShardsByRepo, Entry entry, + String indexName, int shardId, ShardSnapshotStatus shardSnapshotStatus) { + if (shardSnapshotStatus.isActive()) { + Tuple plainShardId = Tuple.tuple(indexName, shardId); + assert assignedShardsByRepo.computeIfAbsent(entry.repository(), k -> new HashSet<>()) + .add(plainShardId) : "Found duplicate shard assignments in " + entries; + assert queuedShardsByRepo.getOrDefault(entry.repository(), Collections.emptySet()).contains(plainShardId) == false + : "Found active shard assignments after queued shard assignments in " + entries; + } else if (shardSnapshotStatus.state() == ShardState.QUEUED) { + queuedShardsByRepo.computeIfAbsent(entry.repository(), k -> new HashSet<>()).add(Tuple.tuple(indexName, shardId)); } + return true; + } + public enum ShardState { + INIT((byte) 0, false, false), + SUCCESS((byte) 2, true, false), + FAILED((byte) 3, true, true), + ABORTED((byte) 4, false, true), + MISSING((byte) 5, true, true), /** - * Create a new instance by aborting this instance. Moving all in-progress shards to {@link ShardState#ABORTED} if assigned to a - * data node or to {@link ShardState#FAILED} if not assigned to any data node. - * If the instance had no in-progress shard snapshots assigned to data nodes it's moved to state {@link State#SUCCESS}, otherwise - * it's moved to state {@link State#ABORTED}. - * In the special case where this instance has not yet made any progress on any shard this method just returns - * {@code null} since no abort is needed and the snapshot can simply be removed from the cluster state outright. - * - * @return aborted snapshot entry or {@code null} if entry can be removed from the cluster state directly + * Shard snapshot is waiting for the primary to snapshot to become available. */ - @Nullable - public Entry abort() { - final ImmutableOpenMap.Builder shardsBuilder = ImmutableOpenMap.builder(); - boolean completed = true; - boolean allQueued = true; - for (ObjectObjectCursor shardEntry : shards) { - ShardSnapshotStatus status = shardEntry.value; - allQueued &= status.state() == ShardState.QUEUED; - if (status.state().completed() == false) { - final String nodeId = status.nodeId(); - status = new ShardSnapshotStatus(nodeId, nodeId == null ? ShardState.FAILED : ShardState.ABORTED, - "aborted by snapshot deletion", status.generation()); - } - completed &= status.state().completed(); - shardsBuilder.put(shardEntry.key, status); - } - if (allQueued) { - return null; - } - return fail(shardsBuilder.build(), completed ? State.SUCCESS : State.ABORTED, ABORTED_FAILURE_TEXT); - } - - public Entry fail(ImmutableOpenMap shards, State state, String failure) { - return new Entry(snapshot, includeGlobalState, partial, state, indices, dataStreams, featureStates, startTime, - repositoryStateId, shards, failure, userMetadata, version, source, clones); - } - + WAITING((byte) 6, false, false), /** - * Create a new instance that has its shard assignments replaced by the given shard assignment map. - * If the given shard assignments show all shard snapshots in a completed state then the returned instance will be of state - * {@link State#SUCCESS}, otherwise the state remains unchanged. - * - * @param shards new shard snapshot states - * @return new snapshot entry + * Shard snapshot is waiting for another shard snapshot for the same shard and to the same repository to finish. */ - public Entry withShardStates(ImmutableOpenMap shards) { - if (completed(shards.values())) { - return new Entry(snapshot, includeGlobalState, partial, State.SUCCESS, indices, dataStreams, featureStates, - startTime, repositoryStateId, shards, failure, userMetadata, version); - } - return withStartedShards(shards); - } + QUEUED((byte) 7, false, false); - /** - * Same as {@link #withShardStates} but does not check if the snapshot completed and thus is only to be used when starting new - * shard snapshots on data nodes for a running snapshot. - */ - public Entry withStartedShards(ImmutableOpenMap shards) { - final SnapshotsInProgress.Entry updated = new Entry(snapshot, includeGlobalState, partial, state, indices, dataStreams, - featureStates, startTime, repositoryStateId, shards, failure, userMetadata, version); - assert updated.state().completed() == false && completed(updated.shards().values()) == false - : "Only running snapshots allowed but saw [" + updated + "]"; - return updated; + private final byte value; + + private final boolean completed; + + private final boolean failed; + + ShardState(byte value, boolean completed, boolean failed) { + this.value = value; + this.completed = completed; + this.failed = failed; } - @Override - public String repository() { - return snapshot.getRepository(); + public boolean completed() { + return completed; } - public Snapshot snapshot() { - return this.snapshot; + public boolean failed() { + return failed; } - public ImmutableOpenMap shards() { - return this.shards; + public static ShardState fromValue(byte value) { + switch (value) { + case 0: + return INIT; + case 2: + return SUCCESS; + case 3: + return FAILED; + case 4: + return ABORTED; + case 5: + return MISSING; + case 6: + return WAITING; + case 7: + return QUEUED; + default: + throw new IllegalArgumentException("No shard snapshot state for value [" + value + "]"); + } } + } - public State state() { - return state; + public enum State { + INIT((byte) 0, false), + STARTED((byte) 1, false), + SUCCESS((byte) 2, true), + FAILED((byte) 3, true), + ABORTED((byte) 4, false); + + private final byte value; + + private final boolean completed; + + State(byte value, boolean completed) { + this.value = value; + this.completed = completed; } - public List indices() { - return indices; + public byte value() { + return value; } - public boolean includeGlobalState() { - return includeGlobalState; + public boolean completed() { + return completed; } - public Map userMetadata() { - return userMetadata; + public static State fromValue(byte value) { + switch (value) { + case 0: + return INIT; + case 1: + return STARTED; + case 2: + return SUCCESS; + case 3: + return FAILED; + case 4: + return ABORTED; + default: + throw new IllegalArgumentException("No snapshot state for value [" + value + "]"); + } } + } - public boolean partial() { - return partial; + public static class ShardSnapshotStatus implements Writeable { + + /** + * Shard snapshot status for shards that are waiting for another operation to finish before they can be assigned to a node. + */ + public static final ShardSnapshotStatus UNASSIGNED_QUEUED = + new SnapshotsInProgress.ShardSnapshotStatus(null, ShardState.QUEUED, null); + + /** + * Shard snapshot status for shards that could not be snapshotted because their index was deleted from before the shard snapshot + * started. + */ + public static final ShardSnapshotStatus MISSING = + new SnapshotsInProgress.ShardSnapshotStatus(null, ShardState.MISSING, "missing index", null); + + private final ShardState state; + + @Nullable + private final String nodeId; + + @Nullable + private final String generation; + + @Nullable + private final String reason; + + public ShardSnapshotStatus(String nodeId, String generation) { + this(nodeId, ShardState.INIT, generation); } - public long startTime() { - return startTime; + public ShardSnapshotStatus(@Nullable String nodeId, ShardState state, @Nullable String generation) { + this(nodeId, state, null, generation); } - public List dataStreams() { - return dataStreams; + public ShardSnapshotStatus(@Nullable String nodeId, ShardState state, String reason, @Nullable String generation) { + this.nodeId = nodeId; + this.state = state; + this.reason = reason; + this.generation = generation; + assert assertConsistent(); } - public List featureStates() { - return featureStates; + private boolean assertConsistent() { + // If the state is failed we have to have a reason for this failure + assert state.failed() == false || reason != null; + assert (state != ShardState.INIT && state != ShardState.WAITING) || nodeId != null : "Null node id for state [" + state + "]"; + assert state != ShardState.QUEUED || (nodeId == null && generation == null && reason == null) : + "Found unexpected non-null values for queued state shard nodeId[" + nodeId + "][" + generation + "][" + reason + "]"; + return true; } - @Override - public long repositoryStateId() { - return repositoryStateId; + public static ShardSnapshotStatus readFrom(StreamInput in) throws IOException { + String nodeId = in.readOptionalString(); + final ShardState state = ShardState.fromValue(in.readByte()); + final String generation = in.readOptionalString(); + final String reason = in.readOptionalString(); + if (state == ShardState.QUEUED) { + return UNASSIGNED_QUEUED; + } + return new ShardSnapshotStatus(nodeId, state, reason, generation); } - public String failure() { - return failure; + public ShardState state() { + return state; } - /** - * What version of metadata to use for the snapshot in the repository - */ - public Version version() { - return version; + @Nullable + public String nodeId() { + return nodeId; } @Nullable - public SnapshotId source() { - return source; + public String generation() { + return this.generation; } - public boolean isClone() { - return source != null; + public String reason() { + return reason; } - public ImmutableOpenMap clones() { - return clones; + /** + * Checks if this shard snapshot is actively executing. + * A shard is defined as actively executing if it either is in a state that may write to the repository + * ({@link ShardState#INIT} or {@link ShardState#ABORTED}) or about to write to it in state {@link ShardState#WAITING}. + */ + public boolean isActive() { + return state == ShardState.INIT || state == ShardState.ABORTED || state == ShardState.WAITING; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeOptionalString(nodeId); + out.writeByte(state.value); + out.writeOptionalString(generation); + out.writeOptionalString(reason); } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; - - Entry entry = (Entry) o; - - if (includeGlobalState != entry.includeGlobalState) return false; - if (partial != entry.partial) return false; - if (startTime != entry.startTime) return false; - if (indices.equals(entry.indices) == false) return false; - if (dataStreams.equals(entry.dataStreams) == false) return false; - if (shards.equals(entry.shards) == false) return false; - if (snapshot.equals(entry.snapshot) == false) return false; - if (state != entry.state) return false; - if (repositoryStateId != entry.repositoryStateId) return false; - if (Objects.equals(failure, ((Entry) o).failure) == false) return false; - if (Objects.equals(userMetadata, ((Entry) o).userMetadata) == false) return false; - if (version.equals(entry.version) == false) return false; - if (Objects.equals(source, ((Entry) o).source) == false) return false; - if (clones.equals(((Entry) o).clones) == false) return false; - if (featureStates.equals(entry.featureStates) == false) return false; - - return true; + ShardSnapshotStatus status = (ShardSnapshotStatus) o; + return Objects.equals(nodeId, status.nodeId) && Objects.equals(reason, status.reason) + && Objects.equals(generation, status.generation) && state == status.state; } @Override public int hashCode() { - int result = state.hashCode(); - result = 31 * result + snapshot.hashCode(); - result = 31 * result + (includeGlobalState ? 1 : 0); - result = 31 * result + (partial ? 1 : 0); - result = 31 * result + shards.hashCode(); - result = 31 * result + indices.hashCode(); - result = 31 * result + dataStreams.hashCode(); - result = 31 * result + Long.hashCode(startTime); - result = 31 * result + Long.hashCode(repositoryStateId); - result = 31 * result + (failure == null ? 0 : failure.hashCode()); - result = 31 * result + (userMetadata == null ? 0 : userMetadata.hashCode()); - result = 31 * result + version.hashCode(); - result = 31 * result + (source == null ? 0 : source.hashCode()); - result = 31 * result + clones.hashCode(); - result = 31 * result + featureStates.hashCode(); + int result = state != null ? state.hashCode() : 0; + result = 31 * result + (nodeId != null ? nodeId.hashCode() : 0); + result = 31 * result + (reason != null ? reason.hashCode() : 0); + result = 31 * result + (generation != null ? generation.hashCode() : 0); return result; } @Override public String toString() { - return Strings.toString(this); - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - builder.field(REPOSITORY, snapshot.getRepository()); - builder.field(SNAPSHOT, snapshot.getSnapshotId().getName()); - builder.field(UUID, snapshot.getSnapshotId().getUUID()); - builder.field(INCLUDE_GLOBAL_STATE, includeGlobalState()); - builder.field(PARTIAL, partial); - builder.field(STATE, state); - builder.startArray(INDICES); - { - for (IndexId index : indices) { - index.toXContent(builder, params); - } - } - builder.endArray(); - builder.timeField(START_TIME_MILLIS, START_TIME, startTime); - builder.field(REPOSITORY_STATE_ID, repositoryStateId); - builder.startArray(SHARDS); - { - for (ObjectObjectCursor shardEntry : shards) { - ShardId shardId = shardEntry.key; - ShardSnapshotStatus status = shardEntry.value; - builder.startObject(); - { - builder.field(INDEX, shardId.getIndex()); - builder.field(SHARD, shardId.getId()); - builder.field(STATE, status.state()); - builder.field(NODE, status.nodeId()); - } - builder.endObject(); - } - } - builder.endArray(); - builder.startArray(FEATURE_STATES); - { - for (SnapshotFeatureInfo featureState : featureStates) { - featureState.toXContent(builder, params); - } - } - builder.endArray(); - if (isClone()) { - builder.field(SOURCE, source); - builder.startArray(CLONES); - { - for (ObjectObjectCursor shardEntry : clones) { - RepositoryShardId shardId = shardEntry.key; - ShardSnapshotStatus status = shardEntry.value; - builder.startObject(); - { - builder.field(INDEX, shardId.index()); - builder.field(SHARD, shardId.shardId()); - builder.field(STATE, status.state()); - builder.field(NODE, status.nodeId()); - } - builder.endObject(); - } - } - builder.endArray(); - } - builder.array(DATA_STREAMS, dataStreams.toArray(new String[0])); - builder.endObject(); - return builder; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - snapshot.writeTo(out); - out.writeBoolean(includeGlobalState); - out.writeBoolean(partial); - out.writeByte(state.value()); - out.writeList(indices); - out.writeLong(startTime); - out.writeMap(shards); - out.writeLong(repositoryStateId); - out.writeOptionalString(failure); - out.writeMap(userMetadata); - Version.writeVersion(version, out); - out.writeStringCollection(dataStreams); - if (out.getVersion().onOrAfter(SnapshotsService.CLONE_SNAPSHOT_VERSION)) { - out.writeOptionalWriteable(source); - out.writeMap(clones); - } - if (out.getVersion().onOrAfter(FEATURE_STATES_VERSION)) { - out.writeList(featureStates); - } - } - - @Override - public boolean isFragment() { - return false; - } - } - - /** - * Checks if all shards in the list have completed - * - * @param shards list of shard statuses - * @return true if all shards have completed (either successfully or failed), false otherwise - */ - public static boolean completed(ObjectContainer shards) { - for (ObjectCursor status : shards) { - if (status.value.state().completed == false) { - return false; - } - } - return true; - } - - private static boolean hasFailures(ImmutableOpenMap clones) { - for (ObjectCursor value : clones.values()) { - if (value.value.state().failed()) { - return true; - } + return "ShardSnapshotStatus[state=" + state + ", nodeId=" + nodeId + ", reason=" + reason + ", generation=" + generation + "]"; } - return false; } - public static class ShardSnapshotStatus implements Writeable { - + public static class Entry implements Writeable, ToXContent, RepositoryOperation { + private final State state; + private final Snapshot snapshot; + private final boolean includeGlobalState; + private final boolean partial; /** - * Shard snapshot status for shards that are waiting for another operation to finish before they can be assigned to a node. + * Map of {@link ShardId} to {@link ShardSnapshotStatus} tracking the state of each shard snapshot operation. */ - public static final ShardSnapshotStatus UNASSIGNED_QUEUED = - new SnapshotsInProgress.ShardSnapshotStatus(null, ShardState.QUEUED, null); + private final ImmutableOpenMap shards; + private final List indices; + private final List dataStreams; + private final List featureStates; + private final long startTime; + private final long repositoryStateId; + private final Version version; /** - * Shard snapshot status for shards that could not be snapshotted because their index was deleted from before the shard snapshot - * started. + * Source snapshot if this is a clone operation or {@code null} if this is a snapshot. */ - public static final ShardSnapshotStatus MISSING = - new SnapshotsInProgress.ShardSnapshotStatus(null, ShardState.MISSING, "missing index", null); - - private final ShardState state; - - @Nullable - private final String nodeId; - @Nullable - private final String generation; + private final SnapshotId source; - @Nullable - private final String reason; + /** + * Map of {@link RepositoryShardId} to {@link ShardSnapshotStatus} tracking the state of each shard clone operation in this entry + * the same way {@link #shards} tracks the status of each shard snapshot operation in non-clone entries. + */ + private final ImmutableOpenMap clones; - public ShardSnapshotStatus(String nodeId, String generation) { - this(nodeId, ShardState.INIT, generation); - } + @Nullable private final Map userMetadata; + @Nullable private final String failure; - public ShardSnapshotStatus(@Nullable String nodeId, ShardState state, @Nullable String generation) { - this(nodeId, state, null, generation); + // visible for testing, use #startedEntry and copy constructors in production code + public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List indices, + List dataStreams, List featureStates, long startTime, long repositoryStateId, + ImmutableOpenMap shards, String failure, Map userMetadata, + Version version) { + this(snapshot, includeGlobalState, partial, state, indices, dataStreams, featureStates, startTime, repositoryStateId, shards, + failure, userMetadata, version, null, ImmutableOpenMap.of()); } - public ShardSnapshotStatus(@Nullable String nodeId, ShardState state, String reason, @Nullable String generation) { - this.nodeId = nodeId; + private Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List indices, + List dataStreams, List featureStates, long startTime, long repositoryStateId, + ImmutableOpenMap shards, String failure, Map userMetadata, + Version version, @Nullable SnapshotId source, + @Nullable ImmutableOpenMap clones) { this.state = state; - this.reason = reason; - this.generation = generation; - assert assertConsistent(); + this.snapshot = snapshot; + this.includeGlobalState = includeGlobalState; + this.partial = partial; + this.indices = indices; + this.dataStreams = dataStreams; + this.featureStates = Collections.unmodifiableList(featureStates); + this.startTime = startTime; + this.shards = shards; + this.repositoryStateId = repositoryStateId; + this.failure = failure; + this.userMetadata = userMetadata; + this.version = version; + this.source = source; + if (source == null) { + assert clones == null || clones.isEmpty() : "Provided [" + clones + "] but no source"; + this.clones = ImmutableOpenMap.of(); + } else { + this.clones = clones; + } + assert assertShardsConsistent(this.source, this.state, this.indices, this.shards, this.clones); } - private boolean assertConsistent() { - // If the state is failed we have to have a reason for this failure - assert state.failed() == false || reason != null; - assert (state != ShardState.INIT && state != ShardState.WAITING) || nodeId != null : "Null node id for state [" + state + "]"; - assert state != ShardState.QUEUED || (nodeId == null && generation == null && reason == null) : - "Found unexpected non-null values for queued state shard nodeId[" + nodeId + "][" + generation + "][" + reason + "]"; - return true; + private Entry(StreamInput in) throws IOException { + snapshot = new Snapshot(in); + includeGlobalState = in.readBoolean(); + partial = in.readBoolean(); + state = State.fromValue(in.readByte()); + indices = in.readList(IndexId::new); + startTime = in.readLong(); + shards = in.readImmutableMap(ShardId::new, ShardSnapshotStatus::readFrom); + repositoryStateId = in.readLong(); + failure = in.readOptionalString(); + userMetadata = in.readMap(); + version = Version.readVersion(in); + dataStreams = in.readStringList(); + if (in.getVersion().onOrAfter(SnapshotsService.CLONE_SNAPSHOT_VERSION)) { + source = in.readOptionalWriteable(SnapshotId::new); + clones = in.readImmutableMap(RepositoryShardId::new, ShardSnapshotStatus::readFrom); + } else { + source = null; + clones = ImmutableOpenMap.of(); + } + if (in.getVersion().onOrAfter(FEATURE_STATES_VERSION)) { + featureStates = Collections.unmodifiableList(in.readList(SnapshotFeatureInfo::new)); + } else { + featureStates = Collections.emptyList(); + } } - public static ShardSnapshotStatus readFrom(StreamInput in) throws IOException { - String nodeId = in.readOptionalString(); - final ShardState state = ShardState.fromValue(in.readByte()); - final String generation = in.readOptionalString(); - final String reason = in.readOptionalString(); - if (state == ShardState.QUEUED) { - return UNASSIGNED_QUEUED; + private static boolean assertShardsConsistent(SnapshotId source, State state, List indices, + ImmutableOpenMap shards, + ImmutableOpenMap clones) { + if ((state == State.INIT || state == State.ABORTED) && shards.isEmpty()) { + return true; } - return new ShardSnapshotStatus(nodeId, state, reason, generation); + final Set indexNames = indices.stream().map(IndexId::getName).collect(Collectors.toSet()); + final Set indexNamesInShards = new HashSet<>(); + shards.iterator().forEachRemaining(s -> { + indexNamesInShards.add(s.key.getIndexName()); + assert source == null || s.value.nodeId == null : + "Shard snapshot must not be assigned to data node when copying from snapshot [" + source + "]"; + }); + assert source == null || indexNames.isEmpty() == false : "No empty snapshot clones allowed"; + assert source != null || indexNames.equals(indexNamesInShards) + : "Indices in shards " + indexNamesInShards + " differ from expected indices " + indexNames + " for state [" + state + "]"; + final boolean shardsCompleted = completed(shards.values()) && completed(clones.values()); + // Check state consistency for normal snapshots and started clone operations + if (source == null || clones.isEmpty() == false) { + assert (state.completed() && shardsCompleted) || (state.completed() == false && shardsCompleted == false) + : "Completed state must imply all shards completed but saw state [" + state + "] and shards " + shards; + } + if (source != null && state.completed()) { + assert hasFailures(clones) == false || state == State.FAILED + : "Failed shard clones in [" + clones + "] but state was [" + state + "]"; + } + return true; } - public ShardState state() { - return state; + public Entry withRepoGen(long newRepoGen) { + assert newRepoGen > repositoryStateId : "Updated repository generation [" + newRepoGen + + "] must be higher than current generation [" + repositoryStateId + "]"; + return new Entry(snapshot, includeGlobalState, partial, state, indices, dataStreams, featureStates, startTime, newRepoGen, + shards, failure, userMetadata, version, source, clones); } - @Nullable - public String nodeId() { - return nodeId; + public Entry withClones(ImmutableOpenMap updatedClones) { + if (updatedClones.equals(clones)) { + return this; + } + return new Entry(snapshot, includeGlobalState, partial, + completed(updatedClones.values()) ? (hasFailures(updatedClones) ? State.FAILED : State.SUCCESS) : + state, indices, dataStreams, featureStates, startTime, repositoryStateId, shards, failure, userMetadata, + version, source, updatedClones); } + /** + * Create a new instance by aborting this instance. Moving all in-progress shards to {@link ShardState#ABORTED} if assigned to a + * data node or to {@link ShardState#FAILED} if not assigned to any data node. + * If the instance had no in-progress shard snapshots assigned to data nodes it's moved to state {@link State#SUCCESS}, otherwise + * it's moved to state {@link State#ABORTED}. + * In the special case where this instance has not yet made any progress on any shard this method just returns + * {@code null} since no abort is needed and the snapshot can simply be removed from the cluster state outright. + * + * @return aborted snapshot entry or {@code null} if entry can be removed from the cluster state directly + */ @Nullable - public String generation() { - return this.generation; + public Entry abort() { + final ImmutableOpenMap.Builder shardsBuilder = ImmutableOpenMap.builder(); + boolean completed = true; + boolean allQueued = true; + for (ObjectObjectCursor shardEntry : shards) { + ShardSnapshotStatus status = shardEntry.value; + allQueued &= status.state() == ShardState.QUEUED; + if (status.state().completed() == false) { + final String nodeId = status.nodeId(); + status = new ShardSnapshotStatus(nodeId, nodeId == null ? ShardState.FAILED : ShardState.ABORTED, + "aborted by snapshot deletion", status.generation()); + } + completed &= status.state().completed(); + shardsBuilder.put(shardEntry.key, status); + } + if (allQueued) { + return null; + } + return fail(shardsBuilder.build(), completed ? State.SUCCESS : State.ABORTED, ABORTED_FAILURE_TEXT); } - public String reason() { - return reason; + public Entry fail(ImmutableOpenMap shards, State state, String failure) { + return new Entry(snapshot, includeGlobalState, partial, state, indices, dataStreams, featureStates, startTime, + repositoryStateId, shards, failure, userMetadata, version, source, clones); } /** - * Checks if this shard snapshot is actively executing. - * A shard is defined as actively executing if it either is in a state that may write to the repository - * ({@link ShardState#INIT} or {@link ShardState#ABORTED}) or about to write to it in state {@link ShardState#WAITING}. + * Create a new instance that has its shard assignments replaced by the given shard assignment map. + * If the given shard assignments show all shard snapshots in a completed state then the returned instance will be of state + * {@link State#SUCCESS}, otherwise the state remains unchanged. + * + * @param shards new shard snapshot states + * @return new snapshot entry */ - public boolean isActive() { - return state == ShardState.INIT || state == ShardState.ABORTED || state == ShardState.WAITING; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeOptionalString(nodeId); - out.writeByte(state.value); - out.writeOptionalString(generation); - out.writeOptionalString(reason); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - ShardSnapshotStatus status = (ShardSnapshotStatus) o; - return Objects.equals(nodeId, status.nodeId) && Objects.equals(reason, status.reason) - && Objects.equals(generation, status.generation) && state == status.state; + public Entry withShardStates(ImmutableOpenMap shards) { + if (completed(shards.values())) { + return new Entry(snapshot, includeGlobalState, partial, State.SUCCESS, indices, dataStreams, featureStates, + startTime, repositoryStateId, shards, failure, userMetadata, version); + } + return withStartedShards(shards); } - @Override - public int hashCode() { - int result = state != null ? state.hashCode() : 0; - result = 31 * result + (nodeId != null ? nodeId.hashCode() : 0); - result = 31 * result + (reason != null ? reason.hashCode() : 0); - result = 31 * result + (generation != null ? generation.hashCode() : 0); - return result; + /** + * Same as {@link #withShardStates} but does not check if the snapshot completed and thus is only to be used when starting new + * shard snapshots on data nodes for a running snapshot. + */ + public Entry withStartedShards(ImmutableOpenMap shards) { + final SnapshotsInProgress.Entry updated = new Entry(snapshot, includeGlobalState, partial, state, indices, dataStreams, + featureStates, startTime, repositoryStateId, shards, failure, userMetadata, version); + assert updated.state().completed() == false && completed(updated.shards().values()) == false + : "Only running snapshots allowed but saw [" + updated + "]"; + return updated; } @Override - public String toString() { - return "ShardSnapshotStatus[state=" + state + ", nodeId=" + nodeId + ", reason=" + reason + ", generation=" + generation + "]"; - } - } - - public enum State { - INIT((byte) 0, false), - STARTED((byte) 1, false), - SUCCESS((byte) 2, true), - FAILED((byte) 3, true), - ABORTED((byte) 4, false); - - private final byte value; - - private final boolean completed; - - State(byte value, boolean completed) { - this.value = value; - this.completed = completed; - } - - public byte value() { - return value; - } - - public boolean completed() { - return completed; - } - - public static State fromValue(byte value) { - switch (value) { - case 0: - return INIT; - case 1: - return STARTED; - case 2: - return SUCCESS; - case 3: - return FAILED; - case 4: - return ABORTED; - default: - throw new IllegalArgumentException("No snapshot state for value [" + value + "]"); - } - } - } - - private final List entries; - - private static boolean assertConsistentEntries(List entries) { - final Map>> assignedShardsByRepo = new HashMap<>(); - final Map>> queuedShardsByRepo = new HashMap<>(); - for (Entry entry : entries) { - for (ObjectObjectCursor shard : entry.shards()) { - final ShardId sid = shard.key; - assert assertShardStateConsistent(entries, assignedShardsByRepo, queuedShardsByRepo, entry, sid.getIndexName(), sid.id(), - shard.value); - } - for (ObjectObjectCursor shard : entry.clones()) { - final RepositoryShardId sid = shard.key; - assert assertShardStateConsistent(entries, assignedShardsByRepo, queuedShardsByRepo, entry, sid.indexName(), sid.shardId(), - shard.value); - } - } - for (String repoName : assignedShardsByRepo.keySet()) { - // make sure in-flight-shard-states can be built cleanly for the entries without tripping assertions - InFlightShardSnapshotStates.forRepo(repoName, entries); + public String repository() { + return snapshot.getRepository(); } - return true; - } - private static boolean assertShardStateConsistent(List entries, Map>> assignedShardsByRepo, - Map>> queuedShardsByRepo, Entry entry, - String indexName, int shardId, ShardSnapshotStatus shardSnapshotStatus) { - if (shardSnapshotStatus.isActive()) { - Tuple plainShardId = Tuple.tuple(indexName, shardId); - assert assignedShardsByRepo.computeIfAbsent(entry.repository(), k -> new HashSet<>()) - .add(plainShardId) : "Found duplicate shard assignments in " + entries; - assert queuedShardsByRepo.getOrDefault(entry.repository(), Collections.emptySet()).contains(plainShardId) == false - : "Found active shard assignments after queued shard assignments in " + entries; - } else if (shardSnapshotStatus.state() == ShardState.QUEUED) { - queuedShardsByRepo.computeIfAbsent(entry.repository(), k -> new HashSet<>()).add(Tuple.tuple(indexName, shardId)); + public Snapshot snapshot() { + return this.snapshot; } - return true; - } - public static SnapshotsInProgress of(List entries) { - if (entries.isEmpty()) { - return EMPTY; + public ImmutableOpenMap shards() { + return this.shards; } - return new SnapshotsInProgress(Collections.unmodifiableList(entries)); - } - private SnapshotsInProgress(List entries) { - this.entries = entries; - assert assertConsistentEntries(entries); - } + public State state() { + return state; + } - public List entries() { - return this.entries; - } + public List indices() { + return indices; + } - public Entry snapshot(final Snapshot snapshot) { - for (Entry entry : entries) { - final Snapshot curr = entry.snapshot(); - if (curr.equals(snapshot)) { - return entry; - } + public boolean includeGlobalState() { + return includeGlobalState; } - return null; - } - @Override - public String getWriteableName() { - return TYPE; - } + public Map userMetadata() { + return userMetadata; + } - @Override - public Version getMinimalSupportedVersion() { - return Version.CURRENT.minimumCompatibilityVersion(); - } + public boolean partial() { + return partial; + } - public static NamedDiff readDiffFrom(StreamInput in) throws IOException { - return readDiffFrom(Custom.class, TYPE, in); - } + public long startTime() { + return startTime; + } - public SnapshotsInProgress(StreamInput in) throws IOException { - this.entries = in.readList(SnapshotsInProgress.Entry::new); - } + public List dataStreams() { + return dataStreams; + } - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeList(entries); - } + public List featureStates() { + return featureStates; + } - private static final String REPOSITORY = "repository"; - private static final String SNAPSHOTS = "snapshots"; - private static final String SNAPSHOT = "snapshot"; - private static final String UUID = "uuid"; - private static final String INCLUDE_GLOBAL_STATE = "include_global_state"; - private static final String PARTIAL = "partial"; - private static final String STATE = "state"; - private static final String INDICES = "indices"; - private static final String DATA_STREAMS = "data_streams"; - private static final String SOURCE = "source"; - private static final String CLONES = "clones"; - private static final String START_TIME_MILLIS = "start_time_millis"; - private static final String START_TIME = "start_time"; - private static final String REPOSITORY_STATE_ID = "repository_state_id"; - private static final String SHARDS = "shards"; - private static final String INDEX = "index"; - private static final String SHARD = "shard"; - private static final String NODE = "node"; - private static final String FEATURE_STATES = "feature_states"; + @Override + public long repositoryStateId() { + return repositoryStateId; + } - @Override - public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { - builder.startArray(SNAPSHOTS); - for (Entry entry : entries) { - entry.toXContent(builder, params); + public String failure() { + return failure; } - builder.endArray(); - return builder; - } - public enum ShardState { - INIT((byte) 0, false, false), - SUCCESS((byte) 2, true, false), - FAILED((byte) 3, true, true), - ABORTED((byte) 4, false, true), - MISSING((byte) 5, true, true), - /** - * Shard snapshot is waiting for the primary to snapshot to become available. - */ - WAITING((byte) 6, false, false), /** - * Shard snapshot is waiting for another shard snapshot for the same shard and to the same repository to finish. + * What version of metadata to use for the snapshot in the repository */ - QUEUED((byte) 7, false, false); + public Version version() { + return version; + } - private final byte value; + @Nullable + public SnapshotId source() { + return source; + } - private final boolean completed; + public boolean isClone() { + return source != null; + } - private final boolean failed; + public ImmutableOpenMap clones() { + return clones; + } - ShardState(byte value, boolean completed, boolean failed) { - this.value = value; - this.completed = completed; - this.failed = failed; + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + Entry entry = (Entry) o; + + if (includeGlobalState != entry.includeGlobalState) return false; + if (partial != entry.partial) return false; + if (startTime != entry.startTime) return false; + if (indices.equals(entry.indices) == false) return false; + if (dataStreams.equals(entry.dataStreams) == false) return false; + if (shards.equals(entry.shards) == false) return false; + if (snapshot.equals(entry.snapshot) == false) return false; + if (state != entry.state) return false; + if (repositoryStateId != entry.repositoryStateId) return false; + if (Objects.equals(failure, ((Entry) o).failure) == false) return false; + if (Objects.equals(userMetadata, ((Entry) o).userMetadata) == false) return false; + if (version.equals(entry.version) == false) return false; + if (Objects.equals(source, ((Entry) o).source) == false) return false; + if (clones.equals(((Entry) o).clones) == false) return false; + if (featureStates.equals(entry.featureStates) == false) return false; + + return true; } - public boolean completed() { - return completed; + @Override + public int hashCode() { + int result = state.hashCode(); + result = 31 * result + snapshot.hashCode(); + result = 31 * result + (includeGlobalState ? 1 : 0); + result = 31 * result + (partial ? 1 : 0); + result = 31 * result + shards.hashCode(); + result = 31 * result + indices.hashCode(); + result = 31 * result + dataStreams.hashCode(); + result = 31 * result + Long.hashCode(startTime); + result = 31 * result + Long.hashCode(repositoryStateId); + result = 31 * result + (failure == null ? 0 : failure.hashCode()); + result = 31 * result + (userMetadata == null ? 0 : userMetadata.hashCode()); + result = 31 * result + version.hashCode(); + result = 31 * result + (source == null ? 0 : source.hashCode()); + result = 31 * result + clones.hashCode(); + result = 31 * result + featureStates.hashCode(); + return result; } - public boolean failed() { - return failed; + @Override + public String toString() { + return Strings.toString(this); } - public static ShardState fromValue(byte value) { - switch (value) { - case 0: - return INIT; - case 2: - return SUCCESS; - case 3: - return FAILED; - case 4: - return ABORTED; - case 5: - return MISSING; - case 6: - return WAITING; - case 7: - return QUEUED; - default: - throw new IllegalArgumentException("No shard snapshot state for value [" + value + "]"); + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("repository", snapshot.getRepository()); + builder.field("snapshot", snapshot.getSnapshotId().getName()); + builder.field("uuid", snapshot.getSnapshotId().getUUID()); + builder.field("include_global_state", includeGlobalState()); + builder.field("partial", partial); + builder.field("state", state); + builder.startArray("indices"); + { + for (IndexId index : indices) { + index.toXContent(builder, params); + } + } + builder.endArray(); + builder.timeField("start_time_millis", "start_time", startTime); + builder.field("repository_state_id", repositoryStateId); + builder.startArray("shards"); + { + for (ObjectObjectCursor shardEntry : shards) { + ShardId shardId = shardEntry.key; + writeShardSnapshotStatus(builder, shardId.getIndex(), shardId.getId(), shardEntry.value); + } + } + builder.endArray(); + builder.startArray("feature_states"); + { + for (SnapshotFeatureInfo featureState : featureStates) { + featureState.toXContent(builder, params); + } + } + builder.endArray(); + if (isClone()) { + builder.field("source", source); + builder.startArray("clones"); + { + for (ObjectObjectCursor shardEntry : clones) { + RepositoryShardId shardId = shardEntry.key; + writeShardSnapshotStatus(builder, shardId.index(), shardId.shardId(), shardEntry.value); + } + } + builder.endArray(); + } + builder.array("data_streams", dataStreams.toArray(new String[0])); + builder.endObject(); + return builder; + } + + private void writeShardSnapshotStatus(XContentBuilder builder, ToXContent indexId, int shardId, + ShardSnapshotStatus status) throws IOException { + builder.startObject(); + builder.field("index", indexId); + builder.field("shard", shardId); + builder.field("state", status.state()); + builder.field("node", status.nodeId()); + builder.endObject(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + snapshot.writeTo(out); + out.writeBoolean(includeGlobalState); + out.writeBoolean(partial); + out.writeByte(state.value()); + out.writeList(indices); + out.writeLong(startTime); + out.writeMap(shards); + out.writeLong(repositoryStateId); + out.writeOptionalString(failure); + out.writeMap(userMetadata); + Version.writeVersion(version, out); + out.writeStringCollection(dataStreams); + if (out.getVersion().onOrAfter(SnapshotsService.CLONE_SNAPSHOT_VERSION)) { + out.writeOptionalWriteable(source); + out.writeMap(clones); } + if (out.getVersion().onOrAfter(FEATURE_STATES_VERSION)) { + out.writeList(featureStates); + } + } + + @Override + public boolean isFragment() { + return false; } } } diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 26c71c44b9161..4c04b50268476 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -992,7 +992,7 @@ public ClusterState execute(ClusterState currentState) { } final ClusterState res = readyDeletions( changed ? ClusterState.builder(currentState).putCustom( - SnapshotsInProgress.TYPE, SnapshotsInProgress.of(unmodifiableList(updatedSnapshotEntries))).build() : + SnapshotsInProgress.TYPE, SnapshotsInProgress.of(updatedSnapshotEntries)).build() : currentState).v1(); for (SnapshotDeletionsInProgress.Entry delete : res.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY).getEntries()) { @@ -1486,8 +1486,7 @@ private static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot sn } } if (changed) { - result = ClusterState.builder(state).putCustom( - SnapshotsInProgress.TYPE, SnapshotsInProgress.of(unmodifiableList(entries))).build(); + result = ClusterState.builder(state).putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(entries)).build(); } return readyDeletions(result).v1(); } diff --git a/server/src/test/java/org/elasticsearch/cluster/SnapshotDeletionsInProgressTests.java b/server/src/test/java/org/elasticsearch/cluster/SnapshotDeletionsInProgressTests.java index 60dd0848a6c96..38d6e545e3405 100644 --- a/server/src/test/java/org/elasticsearch/cluster/SnapshotDeletionsInProgressTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/SnapshotDeletionsInProgressTests.java @@ -15,6 +15,7 @@ import java.io.IOException; import java.util.Collections; +import java.util.List; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.equalTo; @@ -22,8 +23,8 @@ public class SnapshotDeletionsInProgressTests extends ESTestCase { public void testXContent() throws IOException { SnapshotDeletionsInProgress sdip = - SnapshotDeletionsInProgress.newInstance(new SnapshotDeletionsInProgress.Entry(Collections.emptyList(), - "repo", 736694267638L, 0, SnapshotDeletionsInProgress.State.STARTED)); + SnapshotDeletionsInProgress.of(List.of(new SnapshotDeletionsInProgress.Entry(Collections.emptyList(), + "repo", 736694267638L, 0, SnapshotDeletionsInProgress.State.STARTED))); try (XContentBuilder builder = jsonBuilder()) { builder.humanReadable(true);