From ae0d37a80d5eceb38d8b99932f7fb107ae15749f Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 16 Aug 2021 07:25:27 +0200 Subject: [PATCH] Refactor SnapshotsInProgress to Use RepositoryId for Concurency Logic (#75501) (#76539) This refactors the snapshots-in-progress logic to work from `RepositoryShardId` when working out what parts of the repository are in-use by writes for snapshot concurrency safety. This change does not go all the way yet on this topic and there are a number of possible follow-up further improvements to simplify the logic that I'd work through over time. But for now this allows fixing the remaining known issues that snapshot stress testing surfaced when combined with the fix in https://github.com/elastic/elasticsearch/pull/75530. These issues all come from the fact that `ShardId` is not a stable key across multiple snapshots if snapshots are partial. The scenarios that are broken are all roughly this: * snapshot-1 for index-A with uuid-A runs and is partial * index-A is deleted and re-created and now has uuid-B * snapshot-2 for index-A is started and we now have it queued up behind snapshot-1 for the index * snapshot-1 finishes and the logic tries to start the next snapshot for the same shard-id * this fails because the shard-id is not the same, we can't compare index uuids, just index name + shard id * this change fixes all these spots by always taking the round trip via `RepositoryShardId` planned follow-ups here are: * dry up logic across cloning and snapshotting more as both now essentially run the same code in many state-machine steps * serialize snapshots-in-progress efficiently instead of re-computing the index and by-repository-shard-id lookups in the constructor every time * refactor the logic in snapshots-in-progress away from maps keyed by shard-id in almost all spots to this end, just keep an index name to `Index` map to work out what exactly is being snapshotted * refactoring snapshots-in-progress to be a map of list of operations keyed by repository shard id instead of a list of maps as it currently is to make the concurrency simpler and more obviously correct closes #75423 relates (#75339 ... should also fix this, but I have to verify by testing with a backport to 7.x) --- .../snapshots/CloneSnapshotIT.java | 6 +- .../TransportSnapshotsStatusAction.java | 15 +- .../cluster/SnapshotsInProgress.java | 148 ++++++--- .../SnapshotInProgressAllocationDecider.java | 3 + .../elasticsearch/index/shard/ShardId.java | 3 +- .../repositories/RepositoryShardId.java | 2 +- .../InFlightShardSnapshotStates.java | 15 +- .../snapshots/SnapshotShardsService.java | 8 +- .../snapshots/SnapshotsService.java | 284 +++++++++--------- .../elasticsearch/snapshots/package-info.java | 4 +- .../snapshots/SnapshotsServiceTests.java | 4 +- 11 files changed, 275 insertions(+), 217 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java index 02b66df66c8d1..60243b85c36e5 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java @@ -578,7 +578,7 @@ public void testStartCloneWithSuccessfulShardSnapshotPendingFinalization() throw awaitClusterState(clusterState -> { final List entries = clusterState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY) .entries(); - return entries.size() == 2 && entries.get(1).clones().isEmpty() == false; + return entries.size() == 2 && entries.get(1).shardsByRepoShardId().isEmpty() == false; }); assertFalse(blockedSnapshot.isDone()); } finally { @@ -615,9 +615,9 @@ public void testStartCloneDuringRunningDelete() throws Exception { logger.info("--> waiting for snapshot clone to be fully initialized"); awaitClusterState(state -> { for (SnapshotsInProgress.Entry entry : state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries()) { - if (entry.clones().isEmpty() == false) { + if (entry.shardsByRepoShardId().isEmpty() == false) { assertEquals(sourceSnapshot, entry.source().getName()); - for (ObjectCursor value : entry.clones().values()) { + for (ObjectCursor value : entry.shardsByRepoShardId().values()) { assertSame(value.value, SnapshotsInProgress.ShardSnapshotStatus.UNASSIGNED_QUEUED); } return true; diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java index 7e5b71b4668bd..32ca85fcf7174 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java @@ -35,6 +35,7 @@ import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; +import org.elasticsearch.repositories.RepositoryShardId; import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInfo; @@ -132,7 +133,7 @@ protected void masterOperation( Set nodesIds = new HashSet<>(); for (SnapshotsInProgress.Entry entry : currentSnapshots) { - for (ObjectCursor status : entry.shards().values()) { + for (ObjectCursor status : entry.shardsByRepoShardId().values()) { if (status.value.nodeId() != null) { nodesIds.add(status.value.nodeId()); } @@ -189,7 +190,8 @@ private void buildResponse( for (SnapshotsInProgress.Entry entry : currentSnapshotEntries) { currentSnapshotNames.add(entry.snapshot().getSnapshotId().getName()); List shardStatusBuilder = new ArrayList<>(); - for (ObjectObjectCursor shardEntry : entry.shards()) { + for (ObjectObjectCursor shardEntry : entry + .shardsByRepoShardId()) { SnapshotsInProgress.ShardSnapshotStatus status = shardEntry.value; if (status.nodeId() != null) { // We should have information about this shard from the shard: @@ -197,7 +199,8 @@ private void buildResponse( if (nodeStatus != null) { Map shardStatues = nodeStatus.status().get(entry.snapshot()); if (shardStatues != null) { - SnapshotIndexShardStatus shardStatus = shardStatues.get(shardEntry.key); + final ShardId sid = entry.shardId(shardEntry.key); + SnapshotIndexShardStatus shardStatus = shardStatues.get(sid); if (shardStatus != null) { // We have full information about this shard if (shardStatus.getStage() == SnapshotIndexShardStage.DONE && shardEntry.value.state() != SUCCESS) { @@ -207,7 +210,7 @@ private void buildResponse( // technically if the data node failed before successfully reporting DONE state to master, then // this shards state would jump to a failed state. shardStatus = new SnapshotIndexShardStatus( - shardEntry.key, + sid, SnapshotIndexShardStage.FINALIZE, shardStatus.getStats(), shardStatus.getNodeId(), @@ -246,7 +249,7 @@ private void buildResponse( if (stage == SnapshotIndexShardStage.DONE) { // Shard snapshot completed successfully so we should be able to load the exact statistics for this // shard from the repository already. - final ShardId shardId = shardEntry.key; + final ShardId shardId = entry.shardId(shardEntry.key); shardStatus = new SnapshotIndexShardStatus( shardId, repositoriesService.repository(entry.repository()) @@ -258,7 +261,7 @@ private void buildResponse( .asCopy() ); } else { - shardStatus = new SnapshotIndexShardStatus(shardEntry.key, stage); + shardStatus = new SnapshotIndexShardStatus(entry.shardId(shardEntry.key), stage); } shardStatusBuilder.add(shardStatus); } diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index 581f70d689a7d..068fa49585891 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoryOperation; @@ -69,7 +70,7 @@ public static SnapshotsInProgress of(List entries) { } public SnapshotsInProgress(StreamInput in) throws IOException { - this(in.readList(SnapshotsInProgress.Entry::new)); + this(in.readList(SnapshotsInProgress.Entry::readFrom)); } private SnapshotsInProgress(List entries) { @@ -203,12 +204,7 @@ private static boolean assertConsistentEntries(List entries) { final Map>> assignedShardsByRepo = new HashMap<>(); final Map>> queuedShardsByRepo = new HashMap<>(); for (Entry entry : entries) { - for (ObjectObjectCursor shard : entry.shards()) { - final ShardId sid = shard.key; - assert assertShardStateConsistent(entries, assignedShardsByRepo, queuedShardsByRepo, entry, sid.getIndexName(), sid.id(), - shard.value); - } - for (ObjectObjectCursor shard : entry.clones()) { + for (ObjectObjectCursor shard : entry.shardsByRepoShardId()) { final RepositoryShardId sid = shard.key; assert assertShardStateConsistent(entries, assignedShardsByRepo, queuedShardsByRepo, entry, sid.indexName(), sid.shardId(), shard.value); @@ -526,6 +522,9 @@ public static class Entry implements Writeable, ToXContent, RepositoryOperation * Map of index name to {@link IndexId}. */ private final Map indices; + + private final Map snapshotIndices; + private final List dataStreams; private final List featureStates; private final long startTime; @@ -539,10 +538,9 @@ public static class Entry implements Writeable, ToXContent, RepositoryOperation private final SnapshotId source; /** - * Map of {@link RepositoryShardId} to {@link ShardSnapshotStatus} tracking the state of each shard clone operation in this entry - * the same way {@link #shards} tracks the status of each shard snapshot operation in non-clone entries. + * Map of {@link RepositoryShardId} to {@link ShardSnapshotStatus} tracking the state of each shard operation in this entry. */ - private final ImmutableOpenMap clones; + private final ImmutableOpenMap shardStatusByRepoShardId; @Nullable private final Map userMetadata; @Nullable private final String failure; @@ -560,7 +558,7 @@ private Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, St List dataStreams, List featureStates, long startTime, long repositoryStateId, ImmutableOpenMap shards, String failure, Map userMetadata, Version version, @Nullable SnapshotId source, - @Nullable ImmutableOpenMap clones) { + ImmutableOpenMap shardStatusByRepoShardId) { this.state = state; this.snapshot = snapshot; this.includeGlobalState = includeGlobalState; @@ -576,20 +574,36 @@ private Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, St this.version = version; this.source = source; if (source == null) { - assert clones == null || clones.isEmpty() : "Provided [" + clones + "] but no source"; - this.clones = ImmutableOpenMap.of(); + assert shardStatusByRepoShardId == null || shardStatusByRepoShardId.isEmpty() + : "Provided explict repo shard id statuses [" + shardStatusByRepoShardId + "] but no source"; + final Map res = new HashMap<>(indices.size()); + final ImmutableOpenMap.Builder byRepoShardIdBuilder = + ImmutableOpenMap.builder(shards.size()); + for (ObjectObjectCursor entry : shards) { + final ShardId shardId = entry.key; + final IndexId indexId = indices.get(shardId.getIndexName()); + final Index index = shardId.getIndex(); + final Index existing = res.put(indexId.getName(), index); + assert existing == null || existing.equals(index) : "Conflicting indices [" + existing + "] and [" + index + "]"; + byRepoShardIdBuilder.put(new RepositoryShardId(indexId, shardId.id()), entry.value); + } + this.shardStatusByRepoShardId = byRepoShardIdBuilder.build(); + snapshotIndices = org.elasticsearch.core.Map.copyOf(res); } else { - this.clones = clones; + assert shards.isEmpty(); + this.shardStatusByRepoShardId = shardStatusByRepoShardId; + snapshotIndices = org.elasticsearch.core.Map.of(); } - assert assertShardsConsistent(this.source, this.state, this.indices, this.shards, this.clones); + assert assertShardsConsistent(this.source, this.state, this.indices, this.shards, this.shardStatusByRepoShardId); } - private Entry(StreamInput in) throws IOException { - snapshot = new Snapshot(in); - includeGlobalState = in.readBoolean(); - partial = in.readBoolean(); - state = State.fromValue(in.readByte()); + private static Entry readFrom(StreamInput in) throws IOException { + final Snapshot snapshot = new Snapshot(in); + final boolean includeGlobalState = in.readBoolean(); + final boolean partial = in.readBoolean(); + final State state = State.fromValue(in.readByte()); final int indexCount = in.readVInt(); + final Map indices; if (indexCount == 0) { indices = Collections.emptyMap(); } else { @@ -600,15 +614,17 @@ private Entry(StreamInput in) throws IOException { } indices = Collections.unmodifiableMap(idx); } - startTime = in.readLong(); - shards = in.readImmutableMap(ShardId::new, ShardSnapshotStatus::readFrom); - repositoryStateId = in.readLong(); - failure = in.readOptionalString(); + final long startTime = in.readLong(); + final ImmutableOpenMap shards = in.readImmutableMap(ShardId::new, ShardSnapshotStatus::readFrom); + final long repositoryStateId = in.readLong(); + final String failure = in.readOptionalString(); + final Map userMetadata; if (in.getVersion().onOrAfter(METADATA_FIELD_INTRODUCED)) { userMetadata = in.readMap(); } else { userMetadata = null; } + final Version version; if (in.getVersion().onOrAfter(VERSION_IN_SNAPSHOT_VERSION)) { version = Version.readVersion(in); } else if (in.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) { @@ -619,28 +635,35 @@ private Entry(StreamInput in) throws IOException { } else { version = SnapshotsService.OLD_SNAPSHOT_FORMAT; } + final List dataStreams; if (in.getVersion().onOrAfter(DATA_STREAMS_IN_SNAPSHOT)) { dataStreams = in.readStringList(); } else { dataStreams = Collections.emptyList(); } + final SnapshotId source; + final ImmutableOpenMap clones; if (in.getVersion().onOrAfter(SnapshotsService.CLONE_SNAPSHOT_VERSION)) { source = in.readOptionalWriteable(SnapshotId::new); clones = in.readImmutableMap(RepositoryShardId::new, ShardSnapshotStatus::readFrom); } else { source = null; - clones = ImmutableOpenMap.of(); + clones = null; } + final List featureStates; if (in.getVersion().onOrAfter(FEATURE_STATES_VERSION)) { featureStates = Collections.unmodifiableList(in.readList(SnapshotFeatureInfo::new)); } else { featureStates = Collections.emptyList(); } + return new SnapshotsInProgress.Entry( + snapshot, includeGlobalState, partial, state, indices, dataStreams, featureStates, startTime, repositoryStateId, + shards, failure, userMetadata, version, source, source == null ? null : clones); } private static boolean assertShardsConsistent(SnapshotId source, State state, Map indices, ImmutableOpenMap shards, - ImmutableOpenMap clones) { + ImmutableOpenMap statusByRepoShardId) { if ((state == State.INIT || state == State.ABORTED) && shards.isEmpty()) { return true; } @@ -654,15 +677,23 @@ private static boolean assertShardsConsistent(SnapshotId source, State state, Ma assert source == null || indexNames.isEmpty() == false : "No empty snapshot clones allowed"; assert source != null || indexNames.equals(indexNamesInShards) : "Indices in shards " + indexNamesInShards + " differ from expected indices " + indexNames + " for state [" + state + "]"; - final boolean shardsCompleted = completed(shards.values()) && completed(clones.values()); + final boolean shardsCompleted = completed(shards.values()) && completed(statusByRepoShardId.values()); // Check state consistency for normal snapshots and started clone operations - if (source == null || clones.isEmpty() == false) { + if (source == null || statusByRepoShardId.isEmpty() == false) { assert (state.completed() && shardsCompleted) || (state.completed() == false && shardsCompleted == false) : "Completed state must imply all shards completed but saw state [" + state + "] and shards " + shards; } if (source != null && state.completed()) { - assert hasFailures(clones) == false || state == State.FAILED - : "Failed shard clones in [" + clones + "] but state was [" + state + "]"; + assert hasFailures(statusByRepoShardId) == false || state == State.FAILED + : "Failed shard clones in [" + statusByRepoShardId + "] but state was [" + state + "]"; + } + if (source == null) { + assert shards.size() == statusByRepoShardId.size(); + for (ObjectObjectCursor entry : shards) { + final ShardId routingShardId = entry.key; + assert statusByRepoShardId.get(new RepositoryShardId(indices.get(routingShardId.getIndexName()), routingShardId.id())) + == entry.value : "found inconsistent values tracked by routing- and repository shard id"; + } } return true; } @@ -684,17 +715,18 @@ public Entry withRepoGen(long newRepoGen) { assert newRepoGen > repositoryStateId : "Updated repository generation [" + newRepoGen + "] must be higher than current generation [" + repositoryStateId + "]"; return new Entry(snapshot, includeGlobalState, partial, state, indices, dataStreams, featureStates, startTime, newRepoGen, - shards, failure, userMetadata, version, source, clones); + shards, failure, userMetadata, version, source, source == null ? ImmutableOpenMap.of() : shardStatusByRepoShardId); } public Entry withClones(ImmutableOpenMap updatedClones) { - if (updatedClones.equals(clones)) { + if (updatedClones.equals(shardStatusByRepoShardId)) { return this; } + assert shards.isEmpty(); return new Entry(snapshot, includeGlobalState, partial, completed(updatedClones.values()) ? (hasFailures(updatedClones) ? State.FAILED : State.SUCCESS) : - state, indices, dataStreams, featureStates, startTime, repositoryStateId, shards, failure, userMetadata, - version, source, updatedClones); + state, indices, dataStreams, featureStates, startTime, repositoryStateId, ImmutableOpenMap.of(), failure, + userMetadata, version, source, updatedClones); } /** @@ -726,12 +758,28 @@ public Entry abort() { if (allQueued) { return null; } - return fail(shardsBuilder.build(), completed ? State.SUCCESS : State.ABORTED, ABORTED_FAILURE_TEXT); + return new Entry( + snapshot, + includeGlobalState, + partial, + completed ? State.SUCCESS : State.ABORTED, + indices, + dataStreams, + featureStates, + startTime, + repositoryStateId, + shardsBuilder.build(), + ABORTED_FAILURE_TEXT, + userMetadata, + version, + source, + null + ); } public Entry fail(ImmutableOpenMap shards, State state, String failure) { return new Entry(snapshot, includeGlobalState, partial, state, indices, dataStreams, featureStates, startTime, - repositoryStateId, shards, failure, userMetadata, version, source, clones); + repositoryStateId, shards, failure, userMetadata, version, source, null); } /** @@ -757,7 +805,7 @@ public Entry withShardStates(ImmutableOpenMap shar public Entry withStartedShards(ImmutableOpenMap shards) { final SnapshotsInProgress.Entry updated = new Entry(snapshot, includeGlobalState, partial, state, indices, dataStreams, featureStates, startTime, repositoryStateId, shards, failure, userMetadata, version); - assert updated.state().completed() == false && completed(updated.shards().values()) == false + assert updated.state().completed() == false && completed(updated.shardsByRepoShardId().values()) == false : "Only running snapshots allowed but saw [" + updated + "]"; return updated; } @@ -771,10 +819,24 @@ public Snapshot snapshot() { return this.snapshot; } + public ImmutableOpenMap shardsByRepoShardId() { + return shardStatusByRepoShardId; + } + + public Index indexByName(String name) { + return snapshotIndices.get(name); + } + public ImmutableOpenMap shards() { + assert isClone() == false : "tried to get routing shards for clone entry [" + this + "]"; return this.shards; } + public ShardId shardId(RepositoryShardId repositoryShardId) { + assert isClone() == false : "must not be called for clone [" + this + "]"; + return new ShardId(indexByName(repositoryShardId.indexName()), repositoryShardId.shardId()); + } + public State state() { return state; } @@ -832,10 +894,6 @@ public boolean isClone() { return source != null; } - public ImmutableOpenMap clones() { - return clones; - } - @Override public boolean equals(Object o) { if (this == o) return true; @@ -856,7 +914,7 @@ public boolean equals(Object o) { if (Objects.equals(userMetadata, ((Entry) o).userMetadata) == false) return false; if (version.equals(entry.version) == false) return false; if (Objects.equals(source, ((Entry) o).source) == false) return false; - if (clones.equals(((Entry) o).clones) == false) return false; + if (shardStatusByRepoShardId.equals(((Entry) o).shardStatusByRepoShardId) == false) return false; if (featureStates.equals(entry.featureStates) == false) return false; return true; @@ -877,7 +935,7 @@ public int hashCode() { result = 31 * result + (userMetadata == null ? 0 : userMetadata.hashCode()); result = 31 * result + version.hashCode(); result = 31 * result + (source == null ? 0 : source.hashCode()); - result = 31 * result + clones.hashCode(); + result = 31 * result + shardStatusByRepoShardId.hashCode(); result = 31 * result + featureStates.hashCode(); return result; } @@ -924,7 +982,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field("source", source); builder.startArray("clones"); { - for (ObjectObjectCursor shardEntry : clones) { + for (ObjectObjectCursor shardEntry : shardStatusByRepoShardId) { RepositoryShardId shardId = shardEntry.key; writeShardSnapshotStatus(builder, shardId.index(), shardId.shardId(), shardEntry.value); } @@ -970,7 +1028,7 @@ public void writeTo(StreamOutput out) throws IOException { } if (out.getVersion().onOrAfter(SnapshotsService.CLONE_SNAPSHOT_VERSION)) { out.writeOptionalWriteable(source); - out.writeMap(clones); + out.writeMap(shardStatusByRepoShardId); } if (out.getVersion().onOrAfter(FEATURE_STATES_VERSION)) { out.writeList(featureStates); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDecider.java index abe3f6a417f5e..9bb4b98985002 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/SnapshotInProgressAllocationDecider.java @@ -55,6 +55,9 @@ private Decision canMove(ShardRouting shardRouting, RoutingAllocation allocation } for (SnapshotsInProgress.Entry snapshot : snapshotsInProgress.entries()) { + if (snapshot.isClone()) { + continue; + } SnapshotsInProgress.ShardSnapshotStatus shardSnapshotStatus = snapshot.shards().get(shardRouting.shardId()); if (shardSnapshotStatus != null && shardSnapshotStatus.state().completed() == false && shardSnapshotStatus.nodeId() != null && shardSnapshotStatus.nodeId().equals(shardRouting.currentNodeId())) { diff --git a/server/src/main/java/org/elasticsearch/index/shard/ShardId.java b/server/src/main/java/org/elasticsearch/index/shard/ShardId.java index 2e2edbc0d59cb..44327a1632cf7 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/ShardId.java +++ b/server/src/main/java/org/elasticsearch/index/shard/ShardId.java @@ -17,6 +17,7 @@ import org.elasticsearch.index.Index; import java.io.IOException; +import java.util.Objects; /** * Allows for shard level components to be injected with the shard id. @@ -28,7 +29,7 @@ public class ShardId implements Comparable, ToXContentFragment, Writeab private final int hashCode; public ShardId(Index index, int shardId) { - this.index = index; + this.index = Objects.requireNonNull(index); this.shardId = shardId; this.hashCode = computeHashCode(); } diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoryShardId.java b/server/src/main/java/org/elasticsearch/repositories/RepositoryShardId.java index 1abb7769aed05..369812f62b9a7 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoryShardId.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoryShardId.java @@ -26,7 +26,7 @@ public final class RepositoryShardId implements Writeable { public RepositoryShardId(IndexId index, int shard) { assert index != null; - this.index = index; + this.index = Objects.requireNonNull(index); this.shard = shard; } diff --git a/server/src/main/java/org/elasticsearch/snapshots/InFlightShardSnapshotStates.java b/server/src/main/java/org/elasticsearch/snapshots/InFlightShardSnapshotStates.java index ae0be06bed3b8..1091e89b4e993 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/InFlightShardSnapshotStates.java +++ b/server/src/main/java/org/elasticsearch/snapshots/InFlightShardSnapshotStates.java @@ -12,7 +12,6 @@ import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.core.Nullable; -import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoryShardId; import org.elasticsearch.repositories.ShardGenerations; @@ -47,16 +46,10 @@ public static InFlightShardSnapshotStates forRepo(String repoName, List clone : runningSnapshot.clones()) { - final RepositoryShardId repoShardId = clone.key; - addStateInformation(generations, busyIds, clone.value, repoShardId.shardId(), repoShardId.indexName()); - } - } else { - for (ObjectObjectCursor shard : runningSnapshot.shards()) { - final ShardId sid = shard.key; - addStateInformation(generations, busyIds, shard.value, sid.id(), sid.getIndexName()); - } + for (ObjectObjectCursor shard : runningSnapshot + .shardsByRepoShardId()) { + final RepositoryShardId sid = shard.key; + addStateInformation(generations, busyIds, shard.value, sid.shardId(), sid.indexName()); } } return new InFlightShardSnapshotStates(generations, busyIds); diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 90e3c9da0e0b5..01350e4623991 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -45,6 +45,7 @@ import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; +import org.elasticsearch.repositories.RepositoryShardId; import org.elasticsearch.repositories.ShardGenerations; import org.elasticsearch.repositories.ShardSnapshotResult; import org.elasticsearch.repositories.SnapshotShardContext; @@ -224,13 +225,14 @@ private void startNewSnapshots(SnapshotsInProgress snapshotsInProgress) { // Abort all running shards for this snapshot final Snapshot snapshot = entry.snapshot(); Map snapshotShards = shardSnapshots.getOrDefault(snapshot, emptyMap()); - for (ObjectObjectCursor shard : entry.shards()) { - final IndexShardSnapshotStatus snapshotStatus = snapshotShards.get(shard.key); + for (ObjectObjectCursor shard : entry.shardsByRepoShardId()) { + final ShardId sid = entry.shardId(shard.key); + final IndexShardSnapshotStatus snapshotStatus = snapshotShards.get(sid); if (snapshotStatus == null) { // due to CS batching we might have missed the INIT state and straight went into ABORTED // notify master that abort has completed by moving to FAILED if (shard.value.state() == ShardState.ABORTED && localNodeId.equals(shard.value.nodeId())) { - notifyFailedSnapshotShard(snapshot, shard.key, shard.value.reason()); + notifyFailedSnapshotShard(snapshot, sid, shard.value.reason()); } } else { snapshotStatus.abortIfNotCompleted("snapshot has been aborted"); diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 5afc2f301e8ec..6ef8b8eacffb3 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -543,8 +543,7 @@ public ClusterState execute(ClusterState currentState) { ImmutableOpenMap shards = shards( snapshots, deletionsInProgress, - currentState.metadata(), - currentState.routingTable(), + currentState, indexIds.values(), useShardGenerations(version), repositoryData, @@ -882,7 +881,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS if (updatedEntry != null) { final Snapshot target = updatedEntry.snapshot(); final SnapshotId sourceSnapshot = updatedEntry.source(); - for (ObjectObjectCursor indexClone : updatedEntry.clones()) { + for (ObjectObjectCursor indexClone : updatedEntry.shardsByRepoShardId()) { final ShardSnapshotStatus shardStatusBefore = indexClone.value; if (shardStatusBefore.state() != ShardState.INIT) { continue; @@ -1137,8 +1136,7 @@ public ClusterState execute(ClusterState currentState) { ImmutableOpenMap shards = shards( snapshots, currentState.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY), - currentState.metadata(), - currentState.routingTable(), + currentState, indexIds.values(), useShardGenerations(version), repositoryData, @@ -1277,21 +1275,15 @@ public void onNoLongerMaster() { private static ShardGenerations buildGenerations(SnapshotsInProgress.Entry snapshot, Metadata metadata) { ShardGenerations.Builder builder = ShardGenerations.builder(); if (snapshot.isClone()) { - snapshot.clones().forEach(c -> { - final IndexId indexId = snapshot.indices().get(c.key.indexName()); - builder.put(indexId, c.key.shardId(), c.value.generation()); - }); + snapshot.shardsByRepoShardId().forEach(c -> builder.put(c.key.index(), c.key.shardId(), c.value.generation())); } else { - snapshot.shards().forEach(c -> { - if (metadata.index(c.key.getIndex()) == null) { - assert snapshot.partial() - : "Index [" + c.key.getIndex() + "] was deleted during a snapshot but snapshot was not partial."; + snapshot.shardsByRepoShardId().forEach(c -> { + final Index index = snapshot.indexByName(c.key.indexName()); + if (metadata.index(index) == null) { + assert snapshot.partial() : "Index [" + index + "] was deleted during a snapshot but snapshot was not partial."; return; } - final IndexId indexId = snapshot.indices().get(c.key.getIndexName()); - if (indexId != null) { - builder.put(indexId, c.key.id(), c.value.generation()); - } + builder.put(c.key.index(), c.key.shardId(), c.value.generation()); }); } return builder.build(); @@ -1493,7 +1485,7 @@ private static boolean assertNoDanglingSnapshots(ClusterState state) { final Set reposSeen = new HashSet<>(); for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) { if (reposSeen.add(entry.repository())) { - for (ObjectCursor value : (entry.isClone() ? entry.clones() : entry.shards()).values()) { + for (ObjectCursor value : entry.shardsByRepoShardId().values()) { if (value.value.equals(ShardSnapshotStatus.UNASSIGNED_QUEUED)) { assert reposWithRunningDelete.contains(entry.repository()) : "Found shard snapshot waiting to be assigned in [" + entry + "] but it is not blocked by any running delete"; @@ -1561,7 +1553,7 @@ public ClusterState execute(ClusterState currentState) { for (final SnapshotsInProgress.Entry snapshot : snapshots.entries()) { if (statesToUpdate.contains(snapshot.state())) { // Currently initializing clone - if (snapshot.isClone() && snapshot.clones().isEmpty()) { + if (snapshot.isClone() && snapshot.shardsByRepoShardId().isEmpty()) { if (initializingClones.contains(snapshot.snapshot())) { updatedSnapshotEntries.add(snapshot); } else { @@ -1570,7 +1562,7 @@ public ClusterState execute(ClusterState currentState) { } } else { ImmutableOpenMap shards = processWaitingShardsAndRemovedNodes( - snapshot.shards(), + snapshot, routingTable, nodes, knownFailures.computeIfAbsent(snapshot.repository(), k -> new HashMap<>()) @@ -1593,7 +1585,7 @@ public ClusterState execute(ClusterState currentState) { changed = true; logger.debug("[{}] was found in dangling INIT or ABORTED state", snapshot); } else { - if (snapshot.state().completed() || completed(snapshot.shards().values())) { + if (snapshot.state().completed() || completed(snapshot.shardsByRepoShardId().values())) { finishedSnapshots.add(snapshot); } updatedSnapshotEntries.add(snapshot); @@ -1665,16 +1657,19 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS } private static ImmutableOpenMap processWaitingShardsAndRemovedNodes( - ImmutableOpenMap snapshotShards, + SnapshotsInProgress.Entry entry, RoutingTable routingTable, DiscoveryNodes nodes, Map knownFailures ) { + if (entry.isClone()) { + return null; + } boolean snapshotChanged = false; ImmutableOpenMap.Builder shards = ImmutableOpenMap.builder(); - for (ObjectObjectCursor shardEntry : snapshotShards) { + for (ObjectObjectCursor shardEntry : entry.shardsByRepoShardId()) { ShardSnapshotStatus shardStatus = shardEntry.value; - ShardId shardId = shardEntry.key; + ShardId shardId = entry.shardId(shardEntry.key); if (shardStatus.equals(ShardSnapshotStatus.UNASSIGNED_QUEUED)) { // this shard snapshot is waiting for a previous snapshot to finish execution for this shard final ShardSnapshotStatus knownFailure = knownFailures.get(shardId); @@ -1749,18 +1744,20 @@ private static ImmutableOpenMap processWaitingShar private static boolean waitingShardsStartedOrUnassigned(SnapshotsInProgress snapshotsInProgress, ClusterChangedEvent event) { for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) { if (entry.state() == State.STARTED) { - for (ObjectObjectCursor shardStatus : entry.shards()) { + for (ObjectObjectCursor shardStatus : entry.shardsByRepoShardId()) { if (shardStatus.value.state() != ShardState.WAITING) { continue; } - final ShardId shardId = shardStatus.key; - if (event.indexRoutingTableChanged(shardId.getIndexName())) { - IndexRoutingTable indexShardRoutingTable = event.state().getRoutingTable().index(shardId.getIndex()); + final RepositoryShardId shardId = shardStatus.key; + if (event.indexRoutingTableChanged(shardId.indexName())) { + IndexRoutingTable indexShardRoutingTable = event.state() + .getRoutingTable() + .index(entry.indexByName(shardId.indexName())); if (indexShardRoutingTable == null) { // index got removed concurrently and we have to fail WAITING state shards return true; } - ShardRouting shardRouting = indexShardRoutingTable.shard(shardId.id()).primaryShard(); + ShardRouting shardRouting = indexShardRoutingTable.shard(shardId.shardId()).primaryShard(); if (shardRouting != null && (shardRouting.started() || shardRouting.unassigned())) { return true; } @@ -1778,11 +1775,11 @@ private static boolean removedNodesCleanupNeeded(SnapshotsInProgress snapshotsIn } final Set removedNodeIds = removedNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toSet()); return snapshotsInProgress.entries().stream().anyMatch(snapshot -> { - if (snapshot.state().completed()) { - // nothing to do for already completed snapshots + if (snapshot.state().completed() || snapshot.isClone()) { + // nothing to do for already completed snapshots or clones that run on master anyways return false; } - for (ObjectCursor shardStatus : snapshot.shards().values()) { + for (ObjectCursor shardStatus : snapshot.shardsByRepoShardId().values()) { final ShardSnapshotStatus shardSnapshotStatus = shardStatus.value; if (shardSnapshotStatus.state().completed() == false && removedNodeIds.contains(shardSnapshotStatus.nodeId())) { // Snapshot had an incomplete shard running on a removed node so we need to adjust that shard's snapshot status @@ -1904,18 +1901,18 @@ private void finalizeSnapshotEntry(Snapshot snapshot, Metadata metadata, Reposit final List finalIndices = shardGenerations.indices().stream().map(IndexId::getName).collect(Collectors.toList()); final Set indexNames = new HashSet<>(finalIndices); ArrayList shardFailures = new ArrayList<>(); - for (ObjectObjectCursor shardStatus : entry.shards()) { - ShardId shardId = shardStatus.key; - if (indexNames.contains(shardId.getIndexName()) == false) { + for (ObjectObjectCursor shardStatus : entry.shardsByRepoShardId()) { + RepositoryShardId shardId = shardStatus.key; + if (indexNames.contains(shardId.indexName()) == false) { assert entry.partial() : "only ignoring shard failures for concurrently deleted indices for partial snapshots"; continue; } ShardSnapshotStatus status = shardStatus.value; final ShardState state = status.state(); if (state.failed()) { - shardFailures.add(new SnapshotShardFailure(status.nodeId(), shardId, status.reason())); + shardFailures.add(new SnapshotShardFailure(status.nodeId(), entry.shardId(shardId), status.reason())); } else if (state.completed() == false) { - shardFailures.add(new SnapshotShardFailure(status.nodeId(), shardId, "skipped")); + shardFailures.add(new SnapshotShardFailure(status.nodeId(), entry.shardId(shardId), "skipped")); } else { assert state == ShardState.SUCCESS; } @@ -1954,8 +1951,8 @@ private void finalizeSnapshotEntry(Snapshot snapshot, Metadata metadata, Reposit final Metadata metaForSnapshot = metadataForSnapshot(entry, meta); final Map indexSnapshotDetails = new HashMap<>(finalIndices.size()); - for (ObjectObjectCursor shardEntry : entry.shards()) { - indexSnapshotDetails.compute(shardEntry.key.getIndexName(), (indexName, current) -> { + for (ObjectObjectCursor shardEntry : entry.shardsByRepoShardId()) { + indexSnapshotDetails.compute(shardEntry.key.indexName(), (indexName, current) -> { if (current == SnapshotInfo.IndexSnapshotDetails.SKIPPED) { // already found an unsuccessful shard in this index, skip this shard return current; @@ -1993,7 +1990,7 @@ private void finalizeSnapshotEntry(Snapshot snapshot, Metadata metadata, Reposit entry.partial() ? onlySuccessfulFeatureStates(entry, finalIndices) : entry.featureStates(), failure, threadPool.absoluteTimeInMillis(), - entry.partial() ? shardGenerations.totalShards() : entry.shards().size(), + entry.partial() ? shardGenerations.totalShards() : entry.shardsByRepoShardId().size(), shardFailures, entry.includeGlobalState(), // TODO: remove this hack making the metadata mutable once @@ -2033,10 +2030,10 @@ private List onlySuccessfulFeatureStates(SnapshotsInProgres // Figure out which indices have unsuccessful shards Set indicesWithUnsuccessfulShards = new HashSet<>(); - entry.shards().keysIt().forEachRemaining(shardId -> { - final ShardState shardState = entry.shards().get(shardId).state(); + entry.shardsByRepoShardId().iterator().forEachRemaining(shard -> { + final ShardState shardState = shard.value.state(); if (shardState.failed() || shardState.completed() == false) { - indicesWithUnsuccessfulShards.add(shardId.getIndexName()); + indicesWithUnsuccessfulShards.add(shard.key.indexName()); } }); @@ -2234,71 +2231,69 @@ private static ClusterState stateWithoutSuccessfulSnapshot(ClusterState state, S if (removedEntry.isClone()) { if (previousEntry.isClone()) { ImmutableOpenMap.Builder updatedShardAssignments = null; - for (ObjectObjectCursor finishedShardEntry : removedEntry.clones()) { + for (ObjectObjectCursor finishedShardEntry : removedEntry + .shardsByRepoShardId()) { final ShardSnapshotStatus shardState = finishedShardEntry.value; if (shardState.state() == ShardState.SUCCESS) { updatedShardAssignments = maybeAddUpdatedAssignment( updatedShardAssignments, shardState, finishedShardEntry.key, - previousEntry.clones() + previousEntry.shardsByRepoShardId() ); } } addCloneEntry(entries, previousEntry, updatedShardAssignments); } else { ImmutableOpenMap.Builder updatedShardAssignments = null; - for (ObjectObjectCursor finishedShardEntry : removedEntry.clones()) { + for (ObjectObjectCursor finishedShardEntry : removedEntry + .shardsByRepoShardId()) { final ShardSnapshotStatus shardState = finishedShardEntry.value; - if (shardState.state() != ShardState.SUCCESS) { - continue; - } - final RepositoryShardId repoShardId = finishedShardEntry.key; - final IndexMetadata indexMeta = state.metadata().index(repoShardId.indexName()); - if (indexMeta == null) { - // The index name that finished cloning does not exist in the cluster state so it isn't relevant - // to the running snapshot + final RepositoryShardId repositoryShardId = finishedShardEntry.key; + if (shardState.state() != ShardState.SUCCESS + || previousEntry.shardsByRepoShardId().containsKey(repositoryShardId) == false) { continue; } updatedShardAssignments = maybeAddUpdatedAssignment( updatedShardAssignments, shardState, - new ShardId(indexMeta.getIndex(), repoShardId.shardId()), + previousEntry.shardId(repositoryShardId), previousEntry.shards() ); + } addSnapshotEntry(entries, previousEntry, updatedShardAssignments); } } else { if (previousEntry.isClone()) { ImmutableOpenMap.Builder updatedShardAssignments = null; - for (ObjectObjectCursor finishedShardEntry : removedEntry.shards()) { + for (ObjectObjectCursor finishedShardEntry : removedEntry + .shardsByRepoShardId()) { final ShardSnapshotStatus shardState = finishedShardEntry.value; - if (shardState.state() != ShardState.SUCCESS) { - continue; - } - final ShardId shardId = finishedShardEntry.key; - final IndexId indexId = removedEntry.indices().get(shardId.getIndexName()); - if (indexId == null) { + final RepositoryShardId repositoryShardId = finishedShardEntry.key; + if (shardState.state() != ShardState.SUCCESS + || previousEntry.shardsByRepoShardId().containsKey(repositoryShardId) == false) { continue; } updatedShardAssignments = maybeAddUpdatedAssignment( updatedShardAssignments, shardState, - new RepositoryShardId(indexId, shardId.getId()), - previousEntry.clones() + repositoryShardId, + previousEntry.shardsByRepoShardId() ); } addCloneEntry(entries, previousEntry, updatedShardAssignments); } else { ImmutableOpenMap.Builder updatedShardAssignments = null; - for (ObjectObjectCursor finishedShardEntry : removedEntry.shards()) { + for (ObjectObjectCursor finishedShardEntry : removedEntry + .shardsByRepoShardId()) { final ShardSnapshotStatus shardState = finishedShardEntry.value; - if (shardState.state() == ShardState.SUCCESS) { + if (shardState.state() == ShardState.SUCCESS + && previousEntry.shardsByRepoShardId().containsKey(finishedShardEntry.key)) { updatedShardAssignments = maybeAddUpdatedAssignment( updatedShardAssignments, shardState, - finishedShardEntry.key, + previousEntry.shardId(finishedShardEntry.key), previousEntry.shards() ); } @@ -2341,7 +2336,7 @@ private static void addCloneEntry( entries.add(entryToUpdate); } else { final ImmutableOpenMap.Builder updatedStatus = ImmutableOpenMap.builder( - entryToUpdate.clones() + entryToUpdate.shardsByRepoShardId() ); updatedStatus.putAll(updatedShardAssignments.build()); entries.add(entryToUpdate.withClones(updatedStatus.build())); @@ -2921,7 +2916,7 @@ private static boolean isWritingToRepository(SnapshotsInProgress.Entry entry) { // Entry is writing to the repo because it's finalizing on master return true; } - for (ObjectCursor value : (entry.isClone() ? entry.clones() : entry.shards()).values()) { + for (ObjectCursor value : entry.shardsByRepoShardId().values()) { if (value.value.isActive()) { // Entry is writing to the repo because it's writing to a shard on a data node or waiting to do so for a concrete shard return true; @@ -3247,14 +3242,12 @@ private SnapshotsInProgress updatedSnapshotsInProgress(ClusterState currentState // Keep track of shardIds that we started snapshots for as a result of removing this delete so we don't assign // them to multiple snapshots by accident - final Map> reassignedShardIds = new HashMap<>(); + final Set reassignedShardIds = new HashSet<>(); boolean changed = false; final String localNodeId = currentState.nodes().getLocalNodeId(); final String repoName = deleteEntry.repository(); - // Computing the new assignments can be quite costly, only do it once below if actually needed - ImmutableOpenMap shardAssignments = null; InFlightShardSnapshotStates inFlightShardStates = null; for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) { if (entry.repository().equals(repoName)) { @@ -3264,9 +3257,9 @@ private SnapshotsInProgress updatedSnapshotsInProgress(ClusterState currentState if (entry.isClone()) { // Collect waiting shards from that entry that we can assign now that we are done with the deletion final List canBeUpdated = new ArrayList<>(); - for (ObjectObjectCursor value : entry.clones()) { + for (ObjectObjectCursor value : entry.shardsByRepoShardId()) { if (value.value.equals(ShardSnapshotStatus.UNASSIGNED_QUEUED) - && alreadyReassigned(value.key.indexName(), value.key.shardId(), reassignedShardIds) == false) { + && reassignedShardIds.contains(value.key) == false) { canBeUpdated.add(value.key); } } @@ -3280,10 +3273,10 @@ && alreadyReassigned(value.key.indexName(), value.key.shardId(), reassignedShard inFlightShardStates = InFlightShardSnapshotStates.forRepo(repoName, snapshotsInProgress.entries()); } final ImmutableOpenMap.Builder updatedAssignmentsBuilder = - ImmutableOpenMap.builder(entry.clones()); + ImmutableOpenMap.builder(entry.shardsByRepoShardId()); for (RepositoryShardId shardId : canBeUpdated) { if (inFlightShardStates.isActive(shardId.indexName(), shardId.shardId()) == false) { - markShardReassigned(shardId.indexName(), shardId.shardId(), reassignedShardIds); + markShardReassigned(shardId, reassignedShardIds); updatedAssignmentsBuilder.put( shardId, new ShardSnapshotStatus( @@ -3302,10 +3295,10 @@ && alreadyReassigned(value.key.indexName(), value.key.shardId(), reassignedShard } } else { // Collect waiting shards that in entry that we can assign now that we are done with the deletion - final List canBeUpdated = new ArrayList<>(); - for (ObjectObjectCursor value : entry.shards()) { + final List canBeUpdated = new ArrayList<>(); + for (ObjectObjectCursor value : entry.shardsByRepoShardId()) { if (value.value.equals(ShardSnapshotStatus.UNASSIGNED_QUEUED) - && alreadyReassigned(value.key.getIndexName(), value.key.getId(), reassignedShardIds) == false) { + && reassignedShardIds.contains(value.key) == false) { canBeUpdated.add(value.key); } } @@ -3313,30 +3306,28 @@ && alreadyReassigned(value.key.getIndexName(), value.key.getId(), reassignedShar // No shards can be updated in this snapshot so we just add it as is again snapshotEntries.add(entry); } else { - if (shardAssignments == null) { - shardAssignments = shards( - snapshotsInProgress, - updatedDeletions, - currentState.metadata(), - currentState.routingTable(), - entry.indices().values(), - entry.version().onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION), - repositoryData, - repoName - ); - } + final ImmutableOpenMap shardAssignments = shards( + snapshotsInProgress, + updatedDeletions, + currentState, + entry.indices().values(), + entry.version().onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION), + repositoryData, + repoName + ); final ImmutableOpenMap.Builder updatedAssignmentsBuilder = ImmutableOpenMap .builder(entry.shards()); - for (ShardId shardId : canBeUpdated) { - final ShardSnapshotStatus updated = shardAssignments.get(shardId); + for (RepositoryShardId shardId : canBeUpdated) { + final ShardId sid = entry.shardId(shardId); + final ShardSnapshotStatus updated = shardAssignments.get(sid); if (updated == null) { // We don't have a new assignment for this shard because its index was concurrently deleted - assert currentState.routingTable().hasIndex(shardId.getIndex()) == false - : "Missing assignment for [" + shardId + "]"; - updatedAssignmentsBuilder.put(shardId, ShardSnapshotStatus.MISSING); + assert currentState.routingTable().hasIndex(sid.getIndex()) == false + : "Missing assignment for [" + sid + "]"; + updatedAssignmentsBuilder.put(sid, ShardSnapshotStatus.MISSING); } else { - markShardReassigned(shardId.getIndexName(), shardId.id(), reassignedShardIds); - updatedAssignmentsBuilder.put(shardId, updated); + markShardReassigned(shardId, reassignedShardIds); + updatedAssignmentsBuilder.put(sid, updated); } } final SnapshotsInProgress.Entry updatedEntry = entry.withShardStates(updatedAssignmentsBuilder.build()); @@ -3361,13 +3352,9 @@ && alreadyReassigned(value.key.getIndexName(), value.key.getId(), reassignedShar return changed ? SnapshotsInProgress.of(snapshotEntries) : null; } - private void markShardReassigned(String indexName, int shardId, Map> reassignments) { - final boolean added = reassignments.computeIfAbsent(indexName, k -> new HashSet<>()).add(shardId); - assert added : "should only ever reassign each shard once but assigned [" + indexName + "][" + shardId + "] multiple times"; - } - - private boolean alreadyReassigned(String indexName, int shardId, Map> reassignments) { - return reassignments.getOrDefault(indexName, Collections.emptySet()).contains(shardId); + private void markShardReassigned(RepositoryShardId shardId, Set reassignments) { + final boolean added = reassignments.add(shardId); + assert added : "should only ever reassign each shard once but assigned [" + shardId + "] multiple times"; } } @@ -3431,8 +3418,7 @@ private static void completeListenersIgnoringException(@Nullable List shards( SnapshotsInProgress snapshotsInProgress, SnapshotDeletionsInProgress deletionsInProgress, - Metadata metadata, - RoutingTable routingTable, + ClusterState currentState, Collection indices, boolean useShardGenerations, RepositoryData repositoryData, @@ -3448,12 +3434,12 @@ private static ImmutableOpenMap clonesBuilder() { assert shardsBuilder == null; if (clonesBuilder == null) { - clonesBuilder = ImmutableOpenMap.builder(entry.clones()); + clonesBuilder = ImmutableOpenMap.builder(entry.shardsByRepoShardId()); } return clonesBuilder; } @@ -4029,7 +4027,7 @@ private void startExecutableClones(SnapshotsInProgress snapshotsInProgress, @Nul for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) { if (entry.isClone() && entry.state() == State.STARTED && (repoName == null || entry.repository().equals(repoName))) { // this is a clone, see if new work is ready - for (ObjectObjectCursor clone : entry.clones()) { + for (ObjectObjectCursor clone : entry.shardsByRepoShardId()) { if (clone.value.state() == ShardState.INIT) { runReadyClone( entry.snapshot(), diff --git a/server/src/main/java/org/elasticsearch/snapshots/package-info.java b/server/src/main/java/org/elasticsearch/snapshots/package-info.java index 91ba65c49b856..2df9a96656d01 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/package-info.java +++ b/server/src/main/java/org/elasticsearch/snapshots/package-info.java @@ -98,8 +98,8 @@ *
    *
  1. First, {@link org.elasticsearch.snapshots.SnapshotsService#cloneSnapshot} is invoked which will place a placeholder entry into * {@code SnapshotsInProgress} that does not yet contain any shard clone assignments. Note that unlike in the case of snapshot - * creation, the shard level clone tasks in {@link org.elasticsearch.cluster.SnapshotsInProgress.Entry#clones} are not created in the - * initial cluster state update as is done for shard snapshot assignments in + * creation, the shard level clone tasks in {@link org.elasticsearch.cluster.SnapshotsInProgress.Entry#shardsByRepoShardId()} are not + * created in the initial cluster state update as is done for shard snapshot assignments in * {@link org.elasticsearch.cluster.SnapshotsInProgress.Entry#shards}. This is due to the fact that shard snapshot assignments are * computed purely from information in the current cluster state while shard clone assignments require information to be read from the * repository, which is too slow of a process to be done inside a cluster state update. Loading this information ahead of creating a diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java index 3385bd7872553..babfa5a1d955c 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsServiceTests.java @@ -301,7 +301,7 @@ public void testCompletedSnapshotStartsClone() throws Exception { assertThat(completedClone.state(), is(SnapshotsInProgress.State.SUCCESS)); final SnapshotsInProgress.Entry startedSnapshot = snapshotsInProgress.entries().get(1); assertThat(startedSnapshot.state(), is(SnapshotsInProgress.State.STARTED)); - final SnapshotsInProgress.ShardSnapshotStatus shardCloneStatus = startedSnapshot.clones().get(repositoryShardId); + final SnapshotsInProgress.ShardSnapshotStatus shardCloneStatus = startedSnapshot.shardsByRepoShardId().get(repositoryShardId); assertThat(shardCloneStatus.state(), is(SnapshotsInProgress.ShardState.INIT)); assertThat(shardCloneStatus.nodeId(), is(updatedClusterState.nodes().getLocalNodeId())); assertIsNoop(updatedClusterState, completeShard); @@ -382,7 +382,7 @@ public void testCompletedCloneStartsNextClone() throws Exception { assertThat(completedClone.state(), is(SnapshotsInProgress.State.SUCCESS)); final SnapshotsInProgress.Entry startedSnapshot = snapshotsInProgress.entries().get(1); assertThat(startedSnapshot.state(), is(SnapshotsInProgress.State.STARTED)); - assertThat(startedSnapshot.clones().get(shardId1).state(), is(SnapshotsInProgress.ShardState.INIT)); + assertThat(startedSnapshot.shardsByRepoShardId().get(shardId1).state(), is(SnapshotsInProgress.ShardState.INIT)); assertIsNoop(updatedClusterState, completeShardClone); }