Skip to content

Commit

Permalink
Cleanup Duplicate Code in Snapshot State Machine (#71624)
Browse files Browse the repository at this point in the history
The logic for interpreting the primary routing entry was duplicated across
snapshotting and cloning needlessly. Also, dried up determining if there's an active
deletion for a repo since we were doing that operation in a number of spots as well.
  • Loading branch information
original-brownbear authored Apr 13, 2021
1 parent e910b2d commit d3d7220
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,20 @@ public List<Entry> getEntries() {
return entries;
}

/**
* Checks if there is an actively executing delete operation for the given repository
*
* @param repository repository name
*/
public boolean hasExecutingDeletion(String repository) {
for (Entry entry : entries) {
if (entry.state() == State.STARTED && entry.repository().equals(repository)) {
return true;
}
}
return false;
}

/**
* Returns {@code true} if there are snapshot deletions in progress in the cluster,
* returns {@code false} otherwise.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,8 +550,7 @@ public ClusterState execute(ClusterState currentState) {
final ImmutableOpenMap.Builder<RepositoryShardId, ShardSnapshotStatus> clonesBuilder =
ImmutableOpenMap.builder();
final boolean readyToExecute = currentState.custom(
SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY).getEntries().stream()
.noneMatch(e -> e.repository().equals(repoName) && e.state() == SnapshotDeletionsInProgress.State.STARTED);
SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY).hasExecutingDeletion(repoName) == false;
final InFlightShardSnapshotStates inFlightShardStates;
if (readyToExecute) {
inFlightShardStates = InFlightShardSnapshotStates.forRepo(repoName, snapshotsInProgress.entries());
Expand Down Expand Up @@ -1691,8 +1690,7 @@ public ClusterState execute(ClusterState currentState) {
repositoryData.getGenId(),
updatedSnapshots.entries().stream().filter(entry -> repoName.equals(entry.repository())).noneMatch(
SnapshotsService::isWritingToRepository)
&& deletionsInProgress.getEntries().stream().noneMatch(entry ->
repoName.equals(entry.repository()) && entry.state() == SnapshotDeletionsInProgress.State.STARTED)
&& deletionsInProgress.hasExecutingDeletion(repoName) == false
? SnapshotDeletionsInProgress.State.STARTED : SnapshotDeletionsInProgress.State.WAITING);
} else {
newDelete = replacedEntry.withAddedSnapshots(snapshotIdsRequiringCleanup);
Expand Down Expand Up @@ -2123,8 +2121,7 @@ && alreadyReassigned(value.key.indexName(), value.key.shardId(), reassignedShard
}
// TODO: the below logic is very similar to that in #startCloning and both could be dried up against each other
// also the code for standard snapshots could make use of this breakout as well
if (canBeUpdated.isEmpty() || updatedDeletions.getEntries().stream().anyMatch(
e -> e.repository().equals(repoName) && e.state() == SnapshotDeletionsInProgress.State.STARTED)) {
if (canBeUpdated.isEmpty() || updatedDeletions.hasExecutingDeletion(repoName)) {
// No shards can be updated in this snapshot so we just add it as is again
snapshotEntries.add(entry);
} else {
Expand Down Expand Up @@ -2261,15 +2258,14 @@ private static <T> void completeListenersIgnoringException(@Nullable List<Action
* @return list of shard to be included into current snapshot
*/
private static ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards(
SnapshotsInProgress snapshotsInProgress, @Nullable SnapshotDeletionsInProgress deletionsInProgress,
SnapshotsInProgress snapshotsInProgress, SnapshotDeletionsInProgress deletionsInProgress,
Metadata metadata, RoutingTable routingTable, List<IndexId> indices, boolean useShardGenerations,
RepositoryData repositoryData, String repoName) {
ImmutableOpenMap.Builder<ShardId, SnapshotsInProgress.ShardSnapshotStatus> builder = ImmutableOpenMap.builder();
final ShardGenerations shardGenerations = repositoryData.shardGenerations();
final InFlightShardSnapshotStates inFlightShardStates =
InFlightShardSnapshotStates.forRepo(repoName, snapshotsInProgress.entries());
final boolean readyToExecute = deletionsInProgress == null || deletionsInProgress.getEntries().stream()
.noneMatch(entry -> entry.repository().equals(repoName) && entry.state() == SnapshotDeletionsInProgress.State.STARTED);
final boolean readyToExecute = deletionsInProgress.hasExecutingDeletion(repoName) == false;
for (IndexId index : indices) {
final String indexName = index.getName();
final boolean isNewIndex = repositoryData.getIndices().containsKey(indexName) == false;
Expand All @@ -2296,22 +2292,10 @@ private static ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus
shardRepoGeneration = null;
}
final ShardSnapshotStatus shardSnapshotStatus;
ShardRouting primary = indexRoutingTable.shard(i).primaryShard();
if (readyToExecute == false || inFlightShardStates.isActive(shardId.getIndexName(), shardId.id())) {
shardSnapshotStatus = ShardSnapshotStatus.UNASSIGNED_QUEUED;
} else if (primary == null || primary.assignedToNode() == false) {
shardSnapshotStatus =
new ShardSnapshotStatus(null, ShardState.MISSING, "primary shard is not allocated", shardRepoGeneration);
} else if (primary.relocating() || primary.initializing()) {
shardSnapshotStatus = new ShardSnapshotStatus(
primary.currentNodeId(), ShardState.WAITING, shardRepoGeneration);
} else if (primary.started() == false) {
shardSnapshotStatus =
new ShardSnapshotStatus(primary.currentNodeId(), ShardState.MISSING,
"primary shard hasn't been started yet", shardRepoGeneration);
} else {
shardSnapshotStatus =
new ShardSnapshotStatus(primary.currentNodeId(), shardRepoGeneration);
shardSnapshotStatus = initShardSnapshotStatus(shardRepoGeneration, indexRoutingTable.shard(i).primaryShard());
}
builder.put(shardId, shardSnapshotStatus);
}
Expand All @@ -2321,6 +2305,28 @@ private static ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus
return builder.build();
}

/**
* Compute the snapshot status for a given shard based on the current primary routing entry for the shard.
*
* @param shardRepoGeneration repository generation of the shard in the repository
* @param primary primary routing entry for the shard
* @return shard snapshot status
*/
private static ShardSnapshotStatus initShardSnapshotStatus(String shardRepoGeneration, ShardRouting primary) {
ShardSnapshotStatus shardSnapshotStatus;
if (primary == null || primary.assignedToNode() == false) {
shardSnapshotStatus = new ShardSnapshotStatus(null, ShardState.MISSING, "primary shard is not allocated", shardRepoGeneration);
} else if (primary.relocating() || primary.initializing()) {
shardSnapshotStatus = new ShardSnapshotStatus(primary.currentNodeId(), ShardState.WAITING, shardRepoGeneration);
} else if (primary.started() == false) {
shardSnapshotStatus = new ShardSnapshotStatus(primary.currentNodeId(), ShardState.MISSING,
"primary shard hasn't been started yet", shardRepoGeneration);
} else {
shardSnapshotStatus = new ShardSnapshotStatus(primary.currentNodeId(), shardRepoGeneration);
}
return shardSnapshotStatus;
}

/**
* Returns the data streams that are currently being snapshotted (with partial == false) and that are contained in the
* indices-to-check set.
Expand Down Expand Up @@ -2520,8 +2526,9 @@ public boolean assertAllListenersResolved() {
finishedStatus.nodeId(), finishedStatus.generation());
// A clone was updated, so we must use the correct data node id for the reassignment as actual shard
// snapshot
final ShardSnapshotStatus shardSnapshotStatus = startShardSnapshotAfterClone(currentState,
updateSnapshotState.updatedState.generation(), finishedRoutingShardId);
final ShardSnapshotStatus shardSnapshotStatus = initShardSnapshotStatus(finishedStatus.generation(),
currentState.routingTable().index(finishedRoutingShardId.getIndex()).shard(finishedRoutingShardId.id())
.primaryShard());
shards.put(finishedRoutingShardId, shardSnapshotStatus);
if (shardSnapshotStatus.isActive()) {
// only remove the update from the list of tasks that might hold a reusable shard if we actually
Expand Down Expand Up @@ -2624,33 +2631,6 @@ public boolean assertAllListenersResolved() {
return ClusterStateTaskExecutor.ClusterTasksResult.<ShardSnapshotUpdate>builder().successes(tasks).build(currentState);
};

/**
* Creates a {@link ShardSnapshotStatus} entry for a snapshot after the shard has become available for snapshotting as a result
* of a snapshot clone completing.
*
* @param currentState current cluster state
* @param shardGeneration shard generation of the shard in the repository
* @param shardId shard id of the shard that just finished cloning
* @return shard snapshot status
*/
private static ShardSnapshotStatus startShardSnapshotAfterClone(ClusterState currentState, String shardGeneration, ShardId shardId) {
final ShardRouting primary = currentState.routingTable().index(shardId.getIndex()).shard(shardId.id()).primaryShard();
final ShardSnapshotStatus shardSnapshotStatus;
if (primary == null || primary.assignedToNode() == false) {
shardSnapshotStatus = new ShardSnapshotStatus(
null, ShardState.MISSING, "primary shard is not allocated", shardGeneration);
} else if (primary.relocating() || primary.initializing()) {
shardSnapshotStatus =
new ShardSnapshotStatus(primary.currentNodeId(), ShardState.WAITING, shardGeneration);
} else if (primary.started() == false) {
shardSnapshotStatus = new ShardSnapshotStatus(primary.currentNodeId(), ShardState.MISSING,
"primary shard hasn't been started yet", shardGeneration);
} else {
shardSnapshotStatus = new ShardSnapshotStatus(primary.currentNodeId(), shardGeneration);
}
return shardSnapshotStatus;
}

/**
* An update to the snapshot state of a shard.
*
Expand Down

0 comments on commit d3d7220

Please sign in to comment.