Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Finalize all snapshots completed by shard snapshot updates #105245

6 changes: 6 additions & 0 deletions docs/changelog/105245.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 105245
summary: Finalize all snapshots completed by shard snapshot updates
area: Snapshot/Restore
type: bug
issues:
- 104939
106 changes: 76 additions & 30 deletions server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
Expand Down Expand Up @@ -70,7 +71,6 @@
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.Tuple;
Expand Down Expand Up @@ -188,6 +188,8 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement

private final MasterServiceTaskQueue<SnapshotTask> masterServiceTaskQueue;

private final ShardSnapshotUpdateCompletionHandler shardSnapshotUpdateCompletionHandler;

/**
* Setting that specifies the maximum number of allowed concurrent snapshot create and delete operations in the
* cluster state. The number of concurrent operations in a cluster state is defined as the sum of
Expand Down Expand Up @@ -243,6 +245,8 @@ public SnapshotsService(
Priority.NORMAL,
UpdateNodeIdsForRemovalTask::executeBatch
);

this.shardSnapshotUpdateCompletionHandler = this::handleShardSnapshotUpdateCompletion;
}

/**
Expand Down Expand Up @@ -3129,17 +3133,23 @@ static final class SnapshotShardsUpdateContext {
// updates that were used to update an existing in-progress shard snapshot
private final Set<ShardSnapshotUpdate> executedUpdates = new HashSet<>();

// enqueues a reroute because some shard snapshots finished
private final Runnable rerouteRunnable;
// handles the completion of some shard-snapshot updates, performing the next possible actions
private final ShardSnapshotUpdateCompletionHandler completionHandler;

// whether to execute a reroute on completion
private boolean needsReroute;

// entries that became complete due to this batch of updates
private final List<SnapshotsInProgress.Entry> newlyCompletedEntries = new ArrayList<>();

SnapshotShardsUpdateContext(
ClusterStateTaskExecutor.BatchExecutionContext<SnapshotTask> batchExecutionContext,
Runnable rerouteRunnable
ShardSnapshotUpdateCompletionHandler completionHandler
) {
this.batchExecutionContext = batchExecutionContext;
this.initialState = batchExecutionContext.initialState();
this.nodeIdRemovalPredicate = SnapshotsInProgress.get(initialState)::isNodeIdForRemoval;
this.rerouteRunnable = new RunOnce(rerouteRunnable); // RunOnce to avoid enqueueing O(#shards) listeners
this.completionHandler = completionHandler;
this.updatesByRepo = new HashMap<>();
for (final var taskContext : batchExecutionContext.taskContexts()) {
if (taskContext.getTask() instanceof ShardSnapshotUpdate task) {
Expand All @@ -3159,7 +3169,11 @@ SnapshotsInProgress computeUpdatedState() {
}
final List<SnapshotsInProgress.Entry> newEntries = new ArrayList<>(oldEntries.size());
for (SnapshotsInProgress.Entry entry : oldEntries) {
newEntries.add(applyToEntry(entry, updates.getValue()));
final var newEntry = applyToEntry(entry, updates.getValue());
newEntries.add(newEntry);
if (newEntry != entry && newEntry.state().completed()) {
newlyCompletedEntries.add(newEntry);
Comment on lines +3170 to +3172
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could use help to better understand two piece of information.

  1. How come a shard update for one snapshot end up updating the state of another snapshot?
  2. How come a ShardSnapshotUpdate processed unassigned shards before SnapshotsService#processExternalChanges?

Using this particular failure as an example, I think the answer to the first question is the following sequence of calls when processing shard updates for snapshot-clone-11:

tryStartNextTaskAfterCloneUpdated(update.repoShardId, update.updatedState);

tryStartSnapshotAfterCloneFinish(repoShardId, updatedState.generation());

startShardSnapshot(repoShardId, generation);

final ShardSnapshotStatus shardSnapshotStatus = initShardSnapshotStatus(generation, shardRouting, nodeIdRemovalPredicate);

In initShardSnapshotStatus, we found out that the shard is unassigned and marked its state to MISSING which in turns leads to snapshot-paritial-12 being marked as SUCCESS.

Is this correct?

Assuming the above is more or less correct, now for question 2, I don't understand how the unassigned shard was not processed processExternalChanges which is called inside SnapshotsService#applyClusterState. This is where my lack of knowledge for cluster coordination hurts ... I thought cluster state update and apply run strictly one after the other, i.e. after a cluster state update, the apply runs before computing another update. In regular case, node disconnection is handled by processExternalChanges which ends snapshots based on the missing shards. How come ShardSnapshotUpdate sees the missing shard first in this case? You said

while that update was in flight the node left the cluster so the other shard snapshots moved to state MISSING in the same update

This seems to say while ShardSnapshotUpdate is running, the cluster state changed under it? Shard update uses a dedicated master task queue which I assume they cannot be processed together with node left? Sorry this all sounds pretty silly. I must have some serious/key misunderstanding somewhere. I'd appreciate if you could help clarify the sequence/order of events here. Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In initShardSnapshotStatus, we found out that the shard is unassigned and marked its state to MISSING which in turns leads to snapshot-paritial-12 being marked as SUCCESS.

Correct.

I don't understand how the unassigned shard was not processed processExternalChanges

Because when processExternalChanges was called [index-0][1] was still in state INIT in snapshot-clone-11, and clones don't run on the data nodes so don't get cancelled on a node-left event, which means that the subsequent operations on [index-0][1] also remain unchanged (in state QUEUED). But then when that shard clone completes it moves to SUCCESS and the later QUEUED states all move to MISSING via the process you describe.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We sync in a separate channel and it helped me to understand why snapshot-partial-12 did not get finished during processExternalChanges. Because (1) the shard did not get marked as a knownFailure since it was part of a clone and (2) snapshot-partial-12 at that time still has the shard as UNASSIGNED_QUEUED which does not has a nodeId and hence does not respond to node leaving.

}
}
updated = updated.withUpdatedEntriesForRepo(repoName, newEntries);
}
Expand All @@ -3178,12 +3192,21 @@ SnapshotsInProgress computeUpdatedState() {
void completeWithUpdatedState(SnapshotsInProgress snapshotsInProgress) {
if (updatesByRepo.isEmpty() == false) {
final var result = new ShardSnapshotUpdateResult(initialState.metadata(), snapshotsInProgress);
for (final var taskContext : batchExecutionContext.taskContexts()) {
if (taskContext.getTask() instanceof ShardSnapshotUpdate task) {
taskContext.success(() -> {
rerouteRunnable.run();
task.listener.onResponse(result);
});
try (
var onCompletionRefs = new RefCountingRunnable(
() -> completionHandler.handleCompletion(result, needsReroute, newlyCompletedEntries, updatesByRepo.keySet())
)
) {
for (final var taskContext : batchExecutionContext.taskContexts()) {
if (taskContext.getTask() instanceof ShardSnapshotUpdate task) {
final var ref = onCompletionRefs.acquire();
needsReroute = true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think needsReroute must be true now sinc this is inside updatesByRepo.isEmpty() == false check. We may not even need it since completionHandler is only called when there is some update.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah nice, good point. I found that flag irksome indeed, dropped in a0db49e.

taskContext.success(() -> {
try (ref) {
task.listener.onResponse(result);
}
});
}
}
}
}
Expand Down Expand Up @@ -3432,6 +3455,39 @@ private ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shardsBuilder() {
*/
record ShardSnapshotUpdateResult(Metadata metadata, SnapshotsInProgress snapshotsInProgress) {}

interface ShardSnapshotUpdateCompletionHandler {
void handleCompletion(
ShardSnapshotUpdateResult shardSnapshotUpdateResult,
boolean needsReroute,
List<SnapshotsInProgress.Entry> newlyCompletedEntries,
Set<String> updatedRepositories
);
}

private void handleShardSnapshotUpdateCompletion(
ShardSnapshotUpdateResult shardSnapshotUpdateResult,
boolean needsReroute,
List<SnapshotsInProgress.Entry> newlyCompletedEntries,
Set<String> updatedRepositories
) {
// Maybe this state update completed one or more snapshots. If we are not already ending them because of some earlier update, end
// them now.
final var snapshotsInProgress = shardSnapshotUpdateResult.snapshotsInProgress();
for (final var newlyCompletedEntry : newlyCompletedEntries) {
if (endingSnapshots.contains(newlyCompletedEntry.snapshot()) == false) {
endSnapshot(newlyCompletedEntry, shardSnapshotUpdateResult.metadata, null);
}
}
// Likewise this state update may enable some new shard clones on any affected repository, so check them all.
for (final var updatedRepository : updatedRepositories) {
startExecutableClones(snapshotsInProgress, updatedRepository);
}
// Also shard snapshot completions may free up some shards to move to other nodes so we must trigger a reroute.
if (needsReroute) {
rerouteService.reroute("after shards snapshot update", Priority.NORMAL, ActionListener.noop());
}
}

/**
* An update to the snapshot state of a shard.
*
Expand Down Expand Up @@ -3511,23 +3567,13 @@ private void innerUpdateSnapshotState(
ShardSnapshotStatus updatedState,
ActionListener<Void> listener
) {
var update = new ShardSnapshotUpdate(snapshot, shardId, repoShardId, updatedState, listener.delegateFailure((delegate, result) -> {
try {
delegate.onResponse(null);
} finally {
// Maybe this state update completed the snapshot. If we are not already ending it because of a concurrent
// state update we check if its state is completed and end it if it is.
final SnapshotsInProgress snapshotsInProgress = result.snapshotsInProgress();
if (endingSnapshots.contains(snapshot) == false) {
final SnapshotsInProgress.Entry updatedEntry = snapshotsInProgress.snapshot(snapshot);
// If the entry is still in the cluster state and is completed, try finalizing the snapshot in the repo
if (updatedEntry != null && updatedEntry.state().completed()) {
endSnapshot(updatedEntry, result.metadata(), null);
}
}
startExecutableClones(snapshotsInProgress, snapshot.getRepository());
}
}));
var update = new ShardSnapshotUpdate(
snapshot,
shardId,
repoShardId,
updatedState,
listener.delegateFailure((delegate, result) -> delegate.onResponse(null))
);
logger.trace("received updated snapshot restore state [{}]", update);
masterServiceTaskQueue.submitTask("update snapshot state", update, null);
}
Expand Down Expand Up @@ -3807,7 +3853,7 @@ public ClusterState execute(BatchExecutionContext<SnapshotTask> batchExecutionCo
final ClusterState state = batchExecutionContext.initialState();
final SnapshotShardsUpdateContext shardsUpdateContext = new SnapshotShardsUpdateContext(
batchExecutionContext,
() -> rerouteService.reroute("after shards snapshot update", Priority.NORMAL, ActionListener.noop())
shardSnapshotUpdateCompletionHandler
);
final SnapshotsInProgress initialSnapshots = SnapshotsInProgress.get(state);
SnapshotsInProgress snapshotsInProgress = shardsUpdateContext.computeUpdatedState();
Expand Down
Loading