diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index 73be2ea006656..7308d471afb9d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -24,7 +24,6 @@ import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState.Custom; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -94,11 +93,9 @@ public static class Entry { private final ImmutableOpenMap> waitingIndices; private final long startTime; private final long repositoryStateId; - @Nullable private final String failure; public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List indices, - long startTime, long repositoryStateId, ImmutableOpenMap shards, - String failure) { + long startTime, long repositoryStateId, ImmutableOpenMap shards) { this.state = state; this.snapshot = snapshot; this.includeGlobalState = includeGlobalState; @@ -113,26 +110,15 @@ public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, Sta this.waitingIndices = findWaitingIndices(shards); } this.repositoryStateId = repositoryStateId; - this.failure = failure; - } - - public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List indices, - long startTime, long repositoryStateId, ImmutableOpenMap shards) { - this(snapshot, includeGlobalState, partial, state, indices, startTime, repositoryStateId, shards, null); } public Entry(Entry entry, State state, ImmutableOpenMap shards) { this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime, - entry.repositoryStateId, shards, entry.failure); - } - - public Entry(Entry entry, State state, ImmutableOpenMap shards, String failure) { - this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime, - entry.repositoryStateId, shards, failure); + entry.repositoryStateId, shards); } public Entry(Entry entry, ImmutableOpenMap shards) { - this(entry, entry.state, shards, entry.failure); + this(entry, entry.state, shards); } public Snapshot snapshot() { @@ -171,10 +157,6 @@ public long getRepositoryStateId() { return repositoryStateId; } - public String failure() { - return failure; - } - @Override public boolean equals(Object o) { if (this == o) return true; @@ -455,12 +437,6 @@ public SnapshotsInProgress(StreamInput in) throws IOException { if (in.getVersion().onOrAfter(REPOSITORY_ID_INTRODUCED_VERSION)) { repositoryStateId = in.readLong(); } - final String failure; - if (in.getVersion().onOrAfter(Version.V_6_7_0)) { - failure = in.readOptionalString(); - } else { - failure = null; - } entries[i] = new Entry(snapshot, includeGlobalState, partial, @@ -468,8 +444,7 @@ public SnapshotsInProgress(StreamInput in) throws IOException { Collections.unmodifiableList(indexBuilder), startTime, repositoryStateId, - builder.build(), - failure); + builder.build()); } this.entries = Arrays.asList(entries); } @@ -501,9 +476,6 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(REPOSITORY_ID_INTRODUCED_VERSION)) { out.writeLong(entry.repositoryStateId); } - if (out.getVersion().onOrAfter(Version.V_6_7_0)) { - out.writeOptionalString(entry.failure); - } } } diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotException.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotException.java index 05db85d6f7211..d389ed634f3af 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotException.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotException.java @@ -51,6 +51,10 @@ public SnapshotException(final Snapshot snapshot, final String msg, final Throwa } } + public SnapshotException(final String repositoryName, final SnapshotId snapshotId, final String msg) { + this(repositoryName, snapshotId, msg, null); + } + public SnapshotException(final String repositoryName, final SnapshotId snapshotId, final String msg, final Throwable cause) { super("[" + repositoryName + ":" + snapshotId + "] " + msg, cause); this.repositoryName = repositoryName; diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 116a3f45b0087..3f1cf1db32807 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -69,26 +69,26 @@ import org.elasticsearch.repositories.Repository; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportChannel; -import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; -import org.elasticsearch.transport.TransportRequestDeduplicator; import org.elasticsearch.transport.TransportRequestHandler; import org.elasticsearch.transport.TransportResponse; -import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; import java.util.stream.Collectors; import static java.util.Collections.emptyMap; -import static java.util.Collections.unmodifiableList; +import static java.util.Collections.unmodifiableMap; import static org.elasticsearch.cluster.SnapshotsInProgress.completed; import static org.elasticsearch.transport.EmptyTransportResponseHandler.INSTANCE_SAME; @@ -114,11 +114,11 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements private final ThreadPool threadPool; - private final Map> shardSnapshots = new HashMap<>(); + private final Lock shutdownLock = new ReentrantLock(); - // A map of snapshots to the shardIds that we already reported to the master as failed - private final TransportRequestDeduplicator remoteFailedRequestDeduplicator = - new TransportRequestDeduplicator<>(); + private final Condition shutdownCondition = shutdownLock.newCondition(); + + private volatile Map> shardSnapshots = emptyMap(); private final SnapshotStateExecutor snapshotStateExecutor = new SnapshotStateExecutor(); private final UpdateSnapshotStatusAction updateSnapshotStatusHandler; @@ -139,7 +139,7 @@ public SnapshotShardsService(Settings settings, ClusterService clusterService, S } // The constructor of UpdateSnapshotStatusAction will register itself to the TransportService. - this.updateSnapshotStatusHandler = new UpdateSnapshotStatusAction( + this.updateSnapshotStatusHandler = new UpdateSnapshotStatusAction(settings, UPDATE_SNAPSHOT_STATUS_ACTION_NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver); if (DiscoveryNode.isMasterNode(settings)) { @@ -147,6 +147,7 @@ public SnapshotShardsService(Settings settings, ClusterService clusterService, S transportService.registerRequestHandler(UPDATE_SNAPSHOT_STATUS_ACTION_NAME_V6, UpdateSnapshotStatusRequestV6::new, ThreadPool.Names.SAME, new UpdateSnapshotStateRequestHandlerV6()); } + } @Override @@ -160,6 +161,16 @@ protected void doStart() { @Override protected void doStop() { + shutdownLock.lock(); + try { + while(!shardSnapshots.isEmpty() && shutdownCondition.await(5, TimeUnit.SECONDS)) { + // Wait for at most 5 second for locally running snapshots to finish + } + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } finally { + shutdownLock.unlock(); + } } @Override @@ -174,9 +185,7 @@ public void clusterChanged(ClusterChangedEvent event) { SnapshotsInProgress currentSnapshots = event.state().custom(SnapshotsInProgress.TYPE); if ((previousSnapshots == null && currentSnapshots != null) || (previousSnapshots != null && previousSnapshots.equals(currentSnapshots) == false)) { - synchronized (shardSnapshots) { - processIndexShardSnapshots(currentSnapshots, event.state().nodes().getMasterNode()); - } + processIndexShardSnapshots(event); } String previousMasterNodeId = event.previousState().nodes().getMasterNodeId(); @@ -193,14 +202,13 @@ public void clusterChanged(ClusterChangedEvent event) { @Override public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) { // abort any snapshots occurring on the soon-to-be closed shard - synchronized (shardSnapshots) { - for (Map.Entry> snapshotShards : shardSnapshots.entrySet()) { - Map shards = snapshotShards.getValue(); - if (shards.containsKey(shardId)) { - logger.debug("[{}] shard closing, abort snapshotting for snapshot [{}]", - shardId, snapshotShards.getKey().getSnapshotId()); - shards.get(shardId).abortIfNotCompleted("shard is closing, aborting"); - } + Map> snapshotShardsMap = shardSnapshots; + for (Map.Entry> snapshotShards : snapshotShardsMap.entrySet()) { + Map shards = snapshotShards.getValue(); + if (shards.containsKey(shardId)) { + logger.debug("[{}] shard closing, abort snapshotting for snapshot [{}]", + shardId, snapshotShards.getKey().getSnapshotId()); + shards.get(shardId).abortIfNotCompleted("shard is closing, aborting"); } } } @@ -215,146 +223,163 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh * @return map of shard id to snapshot status */ public Map currentSnapshotShards(Snapshot snapshot) { - synchronized (shardSnapshots) { - final Map current = shardSnapshots.get(snapshot); - return current == null ? null : new HashMap<>(current); - } + return shardSnapshots.get(snapshot); } /** * Checks if any new shards should be snapshotted on this node * - * @param snapshotsInProgress Current snapshots in progress in cluster state + * @param event cluster state changed event */ - private void processIndexShardSnapshots(SnapshotsInProgress snapshotsInProgress, DiscoveryNode masterNode) { - cancelRemoved(snapshotsInProgress); - if (snapshotsInProgress != null) { - startNewSnapshots(snapshotsInProgress, masterNode); - } - } - - private void cancelRemoved(@Nullable SnapshotsInProgress snapshotsInProgress) { + private void processIndexShardSnapshots(ClusterChangedEvent event) { + SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE); + Map> survivors = new HashMap<>(); // First, remove snapshots that are no longer there - Iterator>> it = shardSnapshots.entrySet().iterator(); - while (it.hasNext()) { - final Map.Entry> entry = it.next(); + for (Map.Entry> entry : shardSnapshots.entrySet()) { final Snapshot snapshot = entry.getKey(); - if (snapshotsInProgress == null || snapshotsInProgress.snapshot(snapshot) == null) { + if (snapshotsInProgress != null && snapshotsInProgress.snapshot(snapshot) != null) { + survivors.put(entry.getKey(), entry.getValue()); + } else { // abort any running snapshots of shards for the removed entry; // this could happen if for some reason the cluster state update for aborting // running shards is missed, then the snapshot is removed is a subsequent cluster // state update, which is being processed here - it.remove(); for (IndexShardSnapshotStatus snapshotStatus : entry.getValue().values()) { snapshotStatus.abortIfNotCompleted("snapshot has been removed in cluster state, aborting"); } } } - } - private void startNewSnapshots(SnapshotsInProgress snapshotsInProgress, DiscoveryNode masterNode) { // For now we will be mostly dealing with a single snapshot at a time but might have multiple simultaneously running // snapshots in the future + Map> newSnapshots = new HashMap<>(); // Now go through all snapshots and update existing or create missing - final String localNodeId = clusterService.localNode().getId(); - for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) { - final State entryState = entry.state(); - if (entryState == State.STARTED) { - Map startedShards = null; - final Snapshot snapshot = entry.snapshot(); - Map snapshotShards = shardSnapshots.getOrDefault(snapshot, emptyMap()); - for (ObjectObjectCursor shard : entry.shards()) { - // Add all new shards to start processing on - final ShardId shardId = shard.key; - final ShardSnapshotStatus shardSnapshotStatus = shard.value; - if (localNodeId.equals(shardSnapshotStatus.nodeId()) && shardSnapshotStatus.state() == State.INIT - && snapshotShards.containsKey(shardId) == false) { - logger.trace("[{}] - Adding shard to the queue", shardId); - if (startedShards == null) { - startedShards = new HashMap<>(); + final String localNodeId = event.state().nodes().getLocalNodeId(); + final DiscoveryNode masterNode = event.state().nodes().getMasterNode(); + final Map> snapshotIndices = new HashMap<>(); + if (snapshotsInProgress != null) { + for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) { + snapshotIndices.put(entry.snapshot(), + entry.indices().stream().collect(Collectors.toMap(IndexId::getName, Function.identity()))); + if (entry.state() == State.STARTED) { + Map startedShards = new HashMap<>(); + Map snapshotShards = shardSnapshots.get(entry.snapshot()); + for (ObjectObjectCursor shard : entry.shards()) { + // Add all new shards to start processing on + if (localNodeId.equals(shard.value.nodeId())) { + if (shard.value.state() == State.INIT && (snapshotShards == null || !snapshotShards.containsKey(shard.key))) { + logger.trace("[{}] - Adding shard to the queue", shard.key); + startedShards.put(shard.key, IndexShardSnapshotStatus.newInitializing()); + } } - startedShards.put(shardId, IndexShardSnapshotStatus.newInitializing()); } - } - if (startedShards != null && startedShards.isEmpty() == false) { - shardSnapshots.computeIfAbsent(snapshot, s -> new HashMap<>()).putAll(startedShards); - startNewShards(entry, startedShards, masterNode); - } - } else if (entryState == State.ABORTED) { - // Abort all running shards for this snapshot - final Snapshot snapshot = entry.snapshot(); - Map snapshotShards = shardSnapshots.getOrDefault(snapshot, emptyMap()); - for (ObjectObjectCursor shard : entry.shards()) { - final IndexShardSnapshotStatus snapshotStatus = snapshotShards.get(shard.key); - if (snapshotStatus != null) { - final IndexShardSnapshotStatus.Copy lastSnapshotStatus = - snapshotStatus.abortIfNotCompleted("snapshot has been aborted"); - final Stage stage = lastSnapshotStatus.getStage(); - if (stage == Stage.FINALIZE) { - logger.debug("[{}] trying to cancel snapshot on shard [{}] that is finalizing, " + - "letting it finish", snapshot, shard.key); - } else if (stage == Stage.DONE) { - logger.debug("[{}] trying to cancel snapshot on the shard [{}] that is already done, " + - "updating status on the master", snapshot, shard.key); - notifySuccessfulSnapshotShard(snapshot, shard.key, masterNode); - } else if (stage == Stage.FAILURE) { - logger.debug("[{}] trying to cancel snapshot on the shard [{}] that has already failed, " + - "updating status on the master", snapshot, shard.key); - notifyFailedSnapshotShard(snapshot, shard.key, lastSnapshotStatus.getFailure(), masterNode); + if (!startedShards.isEmpty()) { + newSnapshots.put(entry.snapshot(), startedShards); + if (snapshotShards != null) { + // We already saw this snapshot but we need to add more started shards + Map shards = new HashMap<>(); + // Put all shards that were already running on this node + shards.putAll(snapshotShards); + // Put all newly started shards + shards.putAll(startedShards); + survivors.put(entry.snapshot(), unmodifiableMap(shards)); + } else { + // Brand new snapshot that we haven't seen before + survivors.put(entry.snapshot(), unmodifiableMap(startedShards)); } - } else { - // due to CS batching we might have missed the INIT state and straight went into ABORTED - // notify master that abort has completed by moving to FAILED - if (shard.value.state() == State.ABORTED) { - notifyFailedSnapshotShard(snapshot, shard.key, shard.value.reason(), masterNode); + } + } else if (entry.state() == State.ABORTED) { + // Abort all running shards for this snapshot + Map snapshotShards = shardSnapshots.get(entry.snapshot()); + if (snapshotShards != null) { + final String failure = "snapshot has been aborted"; + for (ObjectObjectCursor shard : entry.shards()) { + final IndexShardSnapshotStatus snapshotStatus = snapshotShards.get(shard.key); + if (snapshotStatus != null) { + final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.abortIfNotCompleted(failure); + final Stage stage = lastSnapshotStatus.getStage(); + if (stage == Stage.FINALIZE) { + logger.debug("[{}] trying to cancel snapshot on shard [{}] that is finalizing, " + + "letting it finish", entry.snapshot(), shard.key); + + } else if (stage == Stage.DONE) { + logger.debug("[{}] trying to cancel snapshot on the shard [{}] that is already done, " + + "updating status on the master", entry.snapshot(), shard.key); + notifySuccessfulSnapshotShard(entry.snapshot(), shard.key, localNodeId, masterNode); + + } else if (stage == Stage.FAILURE) { + logger.debug("[{}] trying to cancel snapshot on the shard [{}] that has already failed, " + + "updating status on the master", entry.snapshot(), shard.key); + final String snapshotFailure = lastSnapshotStatus.getFailure(); + notifyFailedSnapshotShard(entry.snapshot(), shard.key, localNodeId, snapshotFailure, masterNode); + } + } } } } } } - } - private void startNewShards(SnapshotsInProgress.Entry entry, Map startedShards, - DiscoveryNode masterNode) { - final Snapshot snapshot = entry.snapshot(); - final Map indicesMap = entry.indices().stream().collect(Collectors.toMap(IndexId::getName, Function.identity())); - final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); - for (final Map.Entry shardEntry : startedShards.entrySet()) { - final ShardId shardId = shardEntry.getKey(); - final IndexId indexId = indicesMap.get(shardId.getIndexName()); - assert indexId != null; - executor.execute(new AbstractRunnable() { + // Update the list of snapshots that we saw and tried to started + // If startup of these shards fails later, we don't want to try starting these shards again + shutdownLock.lock(); + try { + shardSnapshots = unmodifiableMap(survivors); + if (shardSnapshots.isEmpty()) { + // Notify all waiting threads that no more snapshots + shutdownCondition.signalAll(); + } + } finally { + shutdownLock.unlock(); + } - private final SetOnce failure = new SetOnce<>(); + // We have new shards to starts + if (newSnapshots.isEmpty() == false) { + Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); + for (final Map.Entry> entry : newSnapshots.entrySet()) { + final Snapshot snapshot = entry.getKey(); + final Map indicesMap = snapshotIndices.get(snapshot); + assert indicesMap != null; - @Override - public void doRun() { - final IndexShard indexShard = - indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id()); - snapshot(indexShard, snapshot, indexId, shardEntry.getValue()); - } + for (final Map.Entry shardEntry : entry.getValue().entrySet()) { + final ShardId shardId = shardEntry.getKey(); + final IndexId indexId = indicesMap.get(shardId.getIndexName()); + executor.execute(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e); - failure.set(e); - } + final SetOnce failure = new SetOnce<>(); - @Override - public void onRejection(Exception e) { - failure.set(e); - } + @Override + public void doRun() { + final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id()); + assert indexId != null; + snapshot(indexShard, snapshot, indexId, shardEntry.getValue()); + } - @Override - public void onAfter() { - final Exception exception = failure.get(); - if (exception != null) { - notifyFailedSnapshotShard(snapshot, shardId, ExceptionsHelper.detailedMessage(exception), masterNode); - } else { - notifySuccessfulSnapshotShard(snapshot, shardId, masterNode); - } + @Override + public void onFailure(Exception e) { + logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", + shardId, snapshot), e); + failure.set(e); + } + + @Override + public void onRejection(Exception e) { + failure.set(e); + } + + @Override + public void onAfter() { + final Exception exception = failure.get(); + if (exception != null) { + final String failure = ExceptionsHelper.detailedMessage(exception); + notifyFailedSnapshotShard(snapshot, shardId, localNodeId, failure, masterNode); + } else { + notifySuccessfulSnapshotShard(snapshot, shardId, localNodeId, masterNode); + } + } + }); } - }); + } } } @@ -407,6 +432,8 @@ private void syncShardStatsOnNewMaster(ClusterChangedEvent event) { if (snapshotsInProgress == null) { return; } + + final String localNodeId = event.state().nodes().getLocalNodeId(); final DiscoveryNode masterNode = event.state().nodes().getMasterNode(); for (SnapshotsInProgress.Entry snapshot : snapshotsInProgress.entries()) { if (snapshot.state() == State.STARTED || snapshot.state() == State.ABORTED) { @@ -415,6 +442,7 @@ private void syncShardStatsOnNewMaster(ClusterChangedEvent event) { ImmutableOpenMap masterShards = snapshot.shards(); for(Map.Entry localShard : localShards.entrySet()) { ShardId shardId = localShard.getKey(); + IndexShardSnapshotStatus localShardStatus = localShard.getValue(); ShardSnapshotStatus masterShard = masterShards.get(shardId); if (masterShard != null && masterShard.state().completed() == false) { final IndexShardSnapshotStatus.Copy indexShardSnapshotStatus = localShard.getValue().asCopy(); @@ -424,13 +452,14 @@ private void syncShardStatsOnNewMaster(ClusterChangedEvent event) { // but we think the shard is done - we need to make new master know that the shard is done logger.debug("[{}] new master thinks the shard [{}] is not completed but the shard is done locally, " + "updating status on the master", snapshot.snapshot(), shardId); - notifySuccessfulSnapshotShard(snapshot.snapshot(), shardId, masterNode); + notifySuccessfulSnapshotShard(snapshot.snapshot(), shardId, localNodeId, masterNode); } else if (stage == Stage.FAILURE) { // but we think the shard failed - we need to make new master know that the shard failed logger.debug("[{}] new master thinks the shard [{}] is not completed but the shard failed locally, " + "updating status on master", snapshot.snapshot(), shardId); - notifyFailedSnapshotShard(snapshot.snapshot(), shardId, indexShardSnapshotStatus.getFailure(), masterNode); + final String failure = indexShardSnapshotStatus.getFailure(); + notifyFailedSnapshotShard(snapshot.snapshot(), shardId, localNodeId, failure, masterNode); } } } @@ -499,64 +528,34 @@ public String toString() { } /** Notify the master node that the given shard has been successfully snapshotted **/ - private void notifySuccessfulSnapshotShard(final Snapshot snapshot, final ShardId shardId, DiscoveryNode masterNode) { - sendSnapshotShardUpdate( - snapshot, shardId, new ShardSnapshotStatus(clusterService.localNode().getId(), State.SUCCESS), masterNode); + void notifySuccessfulSnapshotShard(final Snapshot snapshot, + final ShardId shardId, + final String localNodeId, + final DiscoveryNode masterNode) { + sendSnapshotShardUpdate(snapshot, shardId, new ShardSnapshotStatus(localNodeId, State.SUCCESS), masterNode); } /** Notify the master node that the given shard failed to be snapshotted **/ - private void notifyFailedSnapshotShard(Snapshot snapshot, ShardId shardId, String failure, DiscoveryNode masterNode) { - sendSnapshotShardUpdate( - snapshot, shardId, new ShardSnapshotStatus(clusterService.localNode().getId(), State.FAILED, failure), masterNode); + void notifyFailedSnapshotShard(final Snapshot snapshot, + final ShardId shardId, + final String localNodeId, + final String failure, + final DiscoveryNode masterNode) { + sendSnapshotShardUpdate(snapshot, shardId, new ShardSnapshotStatus(localNodeId, State.FAILED, failure), masterNode); } /** Updates the shard snapshot status by sending a {@link UpdateIndexShardSnapshotStatusRequest} to the master node */ - void sendSnapshotShardUpdate(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus status, DiscoveryNode masterNode) { + void sendSnapshotShardUpdate(final Snapshot snapshot, + final ShardId shardId, + final ShardSnapshotStatus status, + final DiscoveryNode masterNode) { try { if (masterNode.getVersion().onOrAfter(Version.V_6_1_0)) { UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest(snapshot, shardId, status); transportService.sendRequest(transportService.getLocalNode(), UPDATE_SNAPSHOT_STATUS_ACTION_NAME, request, INSTANCE_SAME); } else { - remoteFailedRequestDeduplicator.executeOnce( - new UpdateIndexShardSnapshotStatusRequest(snapshot, shardId, status), - new ActionListener() { - @Override - public void onResponse(Void aVoid) { - logger.trace("[{}] [{}] updated snapshot state", snapshot, status); - } - - @Override - public void onFailure(Exception e) { - logger.warn( - () -> new ParameterizedMessage("[{}] [{}] failed to update snapshot state", snapshot, status), e); - } - }, - (req, reqListener) -> transportService.sendRequest( - transportService.getLocalNode(), UPDATE_SNAPSHOT_STATUS_ACTION_NAME, req, - new TransportResponseHandler() { - @Override - public UpdateIndexShardSnapshotStatusResponse read(StreamInput in) throws IOException { - final UpdateIndexShardSnapshotStatusResponse response = new UpdateIndexShardSnapshotStatusResponse(); - response.readFrom(in); - return response; - } - - @Override - public void handleResponse(UpdateIndexShardSnapshotStatusResponse response) { - reqListener.onResponse(null); - } - - @Override - public void handleException(TransportException exp) { - reqListener.onFailure(exp); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - }) - ); + UpdateSnapshotStatusRequestV6 requestV6 = new UpdateSnapshotStatusRequestV6(snapshot, shardId, status); + transportService.sendRequest(masterNode, UPDATE_SNAPSHOT_STATUS_ACTION_NAME_V6, requestV6, INSTANCE_SAME); } } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("[{}] [{}] failed to update snapshot state", snapshot, status), e); @@ -589,11 +588,11 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS }); } - private class SnapshotStateExecutor implements ClusterStateTaskExecutor { + class SnapshotStateExecutor implements ClusterStateTaskExecutor { @Override public ClusterTasksResult - execute(ClusterState currentState, List tasks) { + execute(ClusterState currentState, List tasks) throws Exception { final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); if (snapshots != null) { int changedCount = 0; @@ -623,6 +622,8 @@ private class SnapshotStateExecutor implements ClusterStateTaskExecutor 0) { logger.trace("changed cluster state triggered by {} snapshot state updates", changedCount); - return ClusterTasksResult.builder().successes(tasks) - .build(ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, - new SnapshotsInProgress(unmodifiableList(entries))).build()); + + final SnapshotsInProgress updatedSnapshots = + new SnapshotsInProgress(entries.toArray(new SnapshotsInProgress.Entry[entries.size()])); + return ClusterTasksResult.builder().successes(tasks).build( + ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, updatedSnapshots).build()); } } return ClusterTasksResult.builder().successes(tasks).build(currentState); @@ -643,14 +646,13 @@ static class UpdateIndexShardSnapshotStatusResponse extends ActionResponse { } - private class UpdateSnapshotStatusAction - extends TransportMasterNodeAction { - UpdateSnapshotStatusAction(TransportService transportService, ClusterService clusterService, - ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { - super( - settings, SnapshotShardsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME, transportService, clusterService, threadPool, - actionFilters, indexNameExpressionResolver, UpdateIndexShardSnapshotStatusRequest::new - ); + class UpdateSnapshotStatusAction extends + TransportMasterNodeAction { + UpdateSnapshotStatusAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, + ThreadPool threadPool, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver) { + super(settings, actionName, transportService, clusterService, threadPool, + actionFilters, indexNameExpressionResolver, UpdateIndexShardSnapshotStatusRequest::new); } @Override @@ -665,7 +667,7 @@ protected UpdateIndexShardSnapshotStatusResponse newResponse() { @Override protected void masterOperation(UpdateIndexShardSnapshotStatusRequest request, ClusterState state, - ActionListener listener) { + ActionListener listener) throws Exception { innerUpdateSnapshotState(request, listener); } diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 998ab2a38639b..c7bf91b476c5b 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -83,9 +83,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.stream.Collectors; -import java.util.stream.StreamSupport; -import static java.util.Collections.unmodifiableList; import static java.util.Collections.unmodifiableMap; import static org.elasticsearch.cluster.SnapshotsInProgress.completed; @@ -100,9 +98,9 @@ * the {@link #beginSnapshot(ClusterState, SnapshotsInProgress.Entry, boolean, ActionListener)} method kicks in and initializes * the snapshot in the repository and then populates list of shards that needs to be snapshotted in cluster state *
  • Each data node is watching for these shards and when new shards scheduled for snapshotting appear in the cluster state, data nodes - * start processing them through {@link SnapshotShardsService#processIndexShardSnapshots} method
  • + * start processing them through {@link SnapshotShardsService#processIndexShardSnapshots(ClusterChangedEvent)} method *
  • Once shard snapshot is created data node updates state of the shard in the cluster state using - * the {@link SnapshotShardsService#sendSnapshotShardUpdate} method
  • + * the {@link SnapshotShardsService#sendSnapshotShardUpdate(Snapshot, ShardId, ShardSnapshotStatus, DiscoveryNode)} method *
  • When last shard is completed master node in {@link SnapshotShardsService#innerUpdateSnapshotState} method marks the snapshot * as completed
  • *
  • After cluster state is updated, the {@link #endSnapshot(SnapshotsInProgress.Entry)} finalizes snapshot in the repository, @@ -123,12 +121,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus private final Map>> snapshotCompletionListeners = new ConcurrentHashMap<>(); - // Set of snapshots that are currently being initialized by this node - private final Set initializingSnapshots = Collections.synchronizedSet(new HashSet<>()); - - // Set of snapshots that are currently being ended by this node - private final Set endingSnapshots = Collections.synchronizedSet(new HashSet<>()); - @Inject public SnapshotsService(Settings settings, ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver, RepositoriesService repositoriesService, ThreadPool threadPool) { @@ -215,7 +207,7 @@ public List snapshots(final String repositoryName, } final ArrayList snapshotList = new ArrayList<>(snapshotSet); CollectionUtil.timSort(snapshotList); - return unmodifiableList(snapshotList); + return Collections.unmodifiableList(snapshotList); } /** @@ -231,7 +223,7 @@ public List currentSnapshots(final String repositoryName) { snapshotList.add(inProgressSnapshot(entry)); } CollectionUtil.timSort(snapshotList); - return unmodifiableList(snapshotList); + return Collections.unmodifiableList(snapshotList); } /** @@ -277,7 +269,7 @@ public ClusterState execute(ClusterState currentState) { if (snapshots == null || snapshots.entries().isEmpty()) { // Store newSnapshot here to be processed in clusterStateProcessed List indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState, - request.indicesOptions(), request.indices())); + request.indicesOptions(), request.indices())); logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices); List snapshotIndices = repositoryData.resolveNewIndices(indices); newSnapshot = new SnapshotsInProgress.Entry(new Snapshot(repositoryName, snapshotId), @@ -288,7 +280,6 @@ public ClusterState execute(ClusterState currentState) { System.currentTimeMillis(), repositoryData.getGenId(), null); - initializingSnapshots.add(newSnapshot.snapshot()); snapshots = new SnapshotsInProgress(newSnapshot); } else { throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, " a snapshot is already running"); @@ -299,9 +290,6 @@ public ClusterState execute(ClusterState currentState) { @Override public void onFailure(String source, Exception e) { logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to create snapshot", repositoryName, snapshotName), e); - if (newSnapshot != null) { - initializingSnapshots.remove(newSnapshot.snapshot()); - } newSnapshot = null; listener.onFailure(e); } @@ -309,21 +297,7 @@ public void onFailure(String source, Exception e) { @Override public void clusterStateProcessed(String source, ClusterState oldState, final ClusterState newState) { if (newSnapshot != null) { - final Snapshot current = newSnapshot.snapshot(); - assert initializingSnapshots.contains(current); - beginSnapshot(newState, newSnapshot, request.partial(), new ActionListener() { - @Override - public void onResponse(final Snapshot snapshot) { - initializingSnapshots.remove(snapshot); - listener.onResponse(snapshot); - } - - @Override - public void onFailure(final Exception e) { - initializingSnapshots.remove(current); - listener.onFailure(e); - } - }); + beginSnapshot(newState, newSnapshot, request.partial(), listener); } } @@ -331,6 +305,7 @@ public void onFailure(final Exception e) { public TimeValue timeout() { return request.masterNodeTimeout(); } + }); } @@ -393,11 +368,8 @@ private void beginSnapshot(final ClusterState clusterState, boolean snapshotCreated; - boolean hadAbortedInitializations; - @Override protected void doRun() { - assert initializingSnapshots.contains(snapshot.snapshot()); Repository repository = repositoriesService.repository(snapshot.snapshot().getRepository()); MetaData metaData = clusterState.metaData(); @@ -422,6 +394,9 @@ protected void doRun() { } clusterService.submitStateUpdateTask("update_snapshot [" + snapshot.snapshot() + "]", new ClusterStateUpdateTask() { + SnapshotsInProgress.Entry endSnapshot; + String failure; + @Override public ClusterState execute(ClusterState currentState) { SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); @@ -432,13 +407,9 @@ public ClusterState execute(ClusterState currentState) { continue; } - if (entry.state() == State.ABORTED) { - entries.add(entry); - assert entry.shards().isEmpty(); - hadAbortedInitializations = true; - } else { - // Replace the snapshot that was just initialized - ImmutableOpenMap shards = + if (entry.state() != State.ABORTED) { + // Replace the snapshot that was just intialized + ImmutableOpenMap shards = shards(currentState, entry.indices()); if (!partial) { Tuple, Set> indicesWithMissingShards = indicesWithMissingShards(shards, @@ -446,6 +417,9 @@ public ClusterState execute(ClusterState currentState) { Set missing = indicesWithMissingShards.v1(); Set closed = indicesWithMissingShards.v2(); if (missing.isEmpty() == false || closed.isEmpty() == false) { + endSnapshot = new SnapshotsInProgress.Entry(entry, State.FAILED, shards); + entries.add(endSnapshot); + final StringBuilder failureMessage = new StringBuilder(); if (missing.isEmpty() == false) { failureMessage.append("Indices don't have primary shards "); @@ -458,15 +432,24 @@ public ClusterState execute(ClusterState currentState) { failureMessage.append("Indices are closed "); failureMessage.append(closed); } - entries.add(new SnapshotsInProgress.Entry(entry, State.FAILED, shards, failureMessage.toString())); + failure = failureMessage.toString(); continue; } } - entries.add(new SnapshotsInProgress.Entry(entry, State.STARTED, shards)); + SnapshotsInProgress.Entry updatedSnapshot = new SnapshotsInProgress.Entry(entry, State.STARTED, shards); + entries.add(updatedSnapshot); + if (completed(shards.values())) { + endSnapshot = updatedSnapshot; + } + } else { + assert entry.state() == State.ABORTED : "expecting snapshot to be aborted during initialization"; + failure = "snapshot was aborted during initialization"; + endSnapshot = entry; + entries.add(endSnapshot); } } return ClusterState.builder(currentState) - .putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(unmodifiableList(entries))) + .putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(Collections.unmodifiableList(entries))) .build(); } @@ -495,12 +478,12 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS // should still exist when listener is registered. userCreateSnapshotListener.onResponse(snapshot.snapshot()); - if (hadAbortedInitializations) { - final SnapshotsInProgress snapshotsInProgress = newState.custom(SnapshotsInProgress.TYPE); - assert snapshotsInProgress != null; - final SnapshotsInProgress.Entry entry = snapshotsInProgress.snapshot(snapshot.snapshot()); - assert entry != null; - endSnapshot(entry); + // Now that snapshot completion listener is registered we can end the snapshot if needed + // We should end snapshot only if 1) we didn't accept it for processing (which happens when there + // is nothing to do) and 2) there was a snapshot in metadata that we should end. Otherwise we should + // go ahead and continue working on this snapshot rather then end here. + if (endSnapshot != null) { + endSnapshot(endSnapshot, failure); } } }); @@ -542,7 +525,7 @@ public void onFailure(Exception e) { cleanupAfterError(e); } - public void onNoLongerMaster() { + public void onNoLongerMaster(String source) { userCreateSnapshotListener.onFailure(e); } @@ -569,7 +552,7 @@ private void cleanupAfterError(Exception exception) { } - private static SnapshotInfo inProgressSnapshot(SnapshotsInProgress.Entry entry) { + private SnapshotInfo inProgressSnapshot(SnapshotsInProgress.Entry entry) { return new SnapshotInfo(entry.snapshot().getSnapshotId(), entry.indices().stream().map(IndexId::getName).collect(Collectors.toList()), entry.startTime(), entry.includeGlobalState()); @@ -627,7 +610,7 @@ public List currentSnapshots(final String repository, builder.add(entry); } } - return unmodifiableList(builder); + return Collections.unmodifiableList(builder); } /** @@ -683,7 +666,7 @@ public Map snapshotShards(final String reposi return unmodifiableMap(shardStatus); } - private static SnapshotShardFailure findShardFailure(List shardFailures, ShardId shardId) { + private SnapshotShardFailure findShardFailure(List shardFailures, ShardId shardId) { for (SnapshotShardFailure shardFailure : shardFailures) { if (shardId.getIndexName().equals(shardFailure.index()) && shardId.getId() == shardFailure.shardId()) { return shardFailure; @@ -697,28 +680,14 @@ public void applyClusterState(ClusterChangedEvent event) { try { if (event.localNodeMaster()) { // We don't remove old master when master flips anymore. So, we need to check for change in master - final SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE); - final boolean newMaster = event.previousState().nodes().isLocalNodeElectedMaster() == false; - if (snapshotsInProgress != null) { - if (newMaster || removedNodesCleanupNeeded(snapshotsInProgress, event.nodesDelta().removedNodes())) { - processSnapshotsOnRemovedNodes(); - } - if (event.routingTableChanged() && waitingShardsStartedOrUnassigned(snapshotsInProgress, event)) { - processStartedShards(); - } - // Cleanup all snapshots that have no more work left: - // 1. Completed snapshots - // 2. Snapshots in state INIT that the previous master failed to start - // 3. Snapshots in any other state that have all their shard tasks completed - snapshotsInProgress.entries().stream().filter( - entry -> entry.state().completed() - || initializingSnapshots.contains(entry.snapshot()) == false - && (entry.state() == State.INIT || completed(entry.shards().values())) - ).forEach(this::endSnapshot); + if (event.nodesRemoved() || event.previousState().nodes().isLocalNodeElectedMaster() == false) { + processSnapshotsOnRemovedNodes(event); } - if (newMaster) { - finalizeSnapshotDeletionFromPreviousMaster(event); + if (event.routingTableChanged()) { + processStartedShards(event); } + removeFinishedSnapshotFromClusterState(event); + finalizeSnapshotDeletionFromPreviousMaster(event); } } catch (Exception e) { logger.warn("Failed to update snapshot state ", e); @@ -737,134 +706,166 @@ public void applyClusterState(ClusterChangedEvent event) { * snapshot was deleted and a call to GET snapshots would reveal that the snapshot no longer exists. */ private void finalizeSnapshotDeletionFromPreviousMaster(ClusterChangedEvent event) { - SnapshotDeletionsInProgress deletionsInProgress = event.state().custom(SnapshotDeletionsInProgress.TYPE); - if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) { - assert deletionsInProgress.getEntries().size() == 1 : "only one in-progress deletion allowed per cluster"; - SnapshotDeletionsInProgress.Entry entry = deletionsInProgress.getEntries().get(0); - deleteSnapshotFromRepository(entry.getSnapshot(), null, entry.getRepositoryStateId()); + if (event.localNodeMaster() && event.previousState().nodes().isLocalNodeElectedMaster() == false) { + SnapshotDeletionsInProgress deletionsInProgress = event.state().custom(SnapshotDeletionsInProgress.TYPE); + if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) { + assert deletionsInProgress.getEntries().size() == 1 : "only one in-progress deletion allowed per cluster"; + SnapshotDeletionsInProgress.Entry entry = deletionsInProgress.getEntries().get(0); + deleteSnapshotFromRepository(entry.getSnapshot(), null, entry.getRepositoryStateId()); + } } } /** - * Cleans up shard snapshots that were running on removed nodes + * Removes a finished snapshot from the cluster state. This can happen if the previous + * master node processed a cluster state update that marked the snapshot as finished, + * but the previous master node died before removing the snapshot in progress from the + * cluster state. It is then the responsibility of the new master node to end the + * snapshot and remove it from the cluster state. */ - private void processSnapshotsOnRemovedNodes() { - clusterService.submitStateUpdateTask("update snapshot state after node removal", new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) { - DiscoveryNodes nodes = currentState.nodes(); - SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); - if (snapshots == null) { - return currentState; - } - boolean changed = false; - ArrayList entries = new ArrayList<>(); - for (final SnapshotsInProgress.Entry snapshot : snapshots.entries()) { - SnapshotsInProgress.Entry updatedSnapshot = snapshot; - if (snapshot.state() == State.STARTED || snapshot.state() == State.ABORTED) { - ImmutableOpenMap.Builder shards = ImmutableOpenMap.builder(); - boolean snapshotChanged = false; - for (ObjectObjectCursor shardEntry : snapshot.shards()) { - ShardSnapshotStatus shardStatus = shardEntry.value; - if (!shardStatus.state().completed() && shardStatus.nodeId() != null) { - if (nodes.nodeExists(shardStatus.nodeId())) { - shards.put(shardEntry.key, shardEntry.value); - } else { - // TODO: Restart snapshot on another node? - snapshotChanged = true; - logger.warn("failing snapshot of shard [{}] on closed node [{}]", - shardEntry.key, shardStatus.nodeId()); - shards.put(shardEntry.key, - new ShardSnapshotStatus(shardStatus.nodeId(), State.FAILED, "node shutdown")); - } - } - } - if (snapshotChanged) { - changed = true; - ImmutableOpenMap shardsMap = shards.build(); - if (!snapshot.state().completed() && completed(shardsMap.values())) { - updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, State.SUCCESS, shardsMap); - } else { - updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, snapshot.state(), shardsMap); - } - } - entries.add(updatedSnapshot); - } else if (snapshot.state() == State.INIT && initializingSnapshots.contains(snapshot.snapshot()) == false) { - changed = true; - // Mark the snapshot as aborted as it failed to start from the previous master - updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, State.ABORTED, snapshot.shards()); - entries.add(updatedSnapshot); - - // Clean up the snapshot that failed to start from the old master - deleteSnapshot(snapshot.snapshot(), new ActionListener() { - @Override - public void onResponse(Void aVoid) { - logger.debug("cleaned up abandoned snapshot {} in INIT state", snapshot.snapshot()); - } - - @Override - public void onFailure(Exception e) { - logger.warn("failed to clean up abandoned snapshot {} in INIT state", snapshot.snapshot()); - } - }, updatedSnapshot.getRepositoryStateId(), false); + private void removeFinishedSnapshotFromClusterState(ClusterChangedEvent event) { + if (event.localNodeMaster() && !event.previousState().nodes().isLocalNodeElectedMaster()) { + SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE); + if (snapshotsInProgress != null && !snapshotsInProgress.entries().isEmpty()) { + for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) { + if (entry.state().completed()) { + endSnapshot(entry); } } - if (changed) { - return ClusterState.builder(currentState) - .putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(unmodifiableList(entries))).build(); - } - return currentState; - } - - @Override - public void onFailure(String source, Exception e) { - logger.warn("failed to update snapshot state after node removal"); } - }); + } } - private void processStartedShards() { - clusterService.submitStateUpdateTask("update snapshot state after shards started", new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) { - RoutingTable routingTable = currentState.routingTable(); - SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); - if (snapshots != null) { + /** + * Cleans up shard snapshots that were running on removed nodes + * + * @param event cluster changed event + */ + private void processSnapshotsOnRemovedNodes(ClusterChangedEvent event) { + if (removedNodesCleanupNeeded(event)) { + // Check if we just became the master + final boolean newMaster = !event.previousState().nodes().isLocalNodeElectedMaster(); + clusterService.submitStateUpdateTask("update snapshot state after node removal", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + DiscoveryNodes nodes = currentState.nodes(); + SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); + if (snapshots == null) { + return currentState; + } boolean changed = false; ArrayList entries = new ArrayList<>(); for (final SnapshotsInProgress.Entry snapshot : snapshots.entries()) { SnapshotsInProgress.Entry updatedSnapshot = snapshot; - if (snapshot.state() == State.STARTED) { - ImmutableOpenMap shards = processWaitingShards(snapshot.shards(), - routingTable); - if (shards != null) { + boolean snapshotChanged = false; + if (snapshot.state() == State.STARTED || snapshot.state() == State.ABORTED) { + ImmutableOpenMap.Builder shards = ImmutableOpenMap.builder(); + for (ObjectObjectCursor shardEntry : snapshot.shards()) { + ShardSnapshotStatus shardStatus = shardEntry.value; + if (!shardStatus.state().completed() && shardStatus.nodeId() != null) { + if (nodes.nodeExists(shardStatus.nodeId())) { + shards.put(shardEntry.key, shardEntry.value); + } else { + // TODO: Restart snapshot on another node? + snapshotChanged = true; + logger.warn("failing snapshot of shard [{}] on closed node [{}]", + shardEntry.key, shardStatus.nodeId()); + shards.put(shardEntry.key, new ShardSnapshotStatus(shardStatus.nodeId(), + State.FAILED, "node shutdown")); + } + } + } + if (snapshotChanged) { changed = true; - if (!snapshot.state().completed() && completed(shards.values())) { - updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, State.SUCCESS, shards); + ImmutableOpenMap shardsMap = shards.build(); + if (!snapshot.state().completed() && completed(shardsMap.values())) { + updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, State.SUCCESS, shardsMap); + endSnapshot(updatedSnapshot); } else { - updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, shards); + updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, snapshot.state(), shardsMap); } } entries.add(updatedSnapshot); + } else if (snapshot.state() == State.INIT && newMaster) { + changed = true; + // Mark the snapshot as aborted as it failed to start from the previous master + updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, State.ABORTED, snapshot.shards()); + entries.add(updatedSnapshot); + + // Clean up the snapshot that failed to start from the old master + deleteSnapshot(snapshot.snapshot(), new ActionListener() { + @Override + public void onResponse(Void aVoid) { + logger.debug("cleaned up abandoned snapshot {} in INIT state", snapshot.snapshot()); + } + + @Override + public void onFailure(Exception e) { + logger.warn("failed to clean up abandoned snapshot {} in INIT state", snapshot.snapshot()); + } + }, updatedSnapshot.getRepositoryStateId(), false); } } if (changed) { - return ClusterState.builder(currentState) - .putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(unmodifiableList(entries))).build(); + snapshots = new SnapshotsInProgress(entries.toArray(new SnapshotsInProgress.Entry[entries.size()])); + return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build(); } + return currentState; } - return currentState; - } - @Override - public void onFailure(String source, Exception e) { - logger.warn(() -> - new ParameterizedMessage("failed to update snapshot state after shards started from [{}] ", source), e); - } - }); + @Override + public void onFailure(String source, Exception e) { + logger.warn("failed to update snapshot state after node removal"); + } + }); + } } - private static ImmutableOpenMap processWaitingShards( + private void processStartedShards(ClusterChangedEvent event) { + if (waitingShardsStartedOrUnassigned(event)) { + clusterService.submitStateUpdateTask("update snapshot state after shards started", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + RoutingTable routingTable = currentState.routingTable(); + SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); + if (snapshots != null) { + boolean changed = false; + ArrayList entries = new ArrayList<>(); + for (final SnapshotsInProgress.Entry snapshot : snapshots.entries()) { + SnapshotsInProgress.Entry updatedSnapshot = snapshot; + if (snapshot.state() == State.STARTED) { + ImmutableOpenMap shards = processWaitingShards(snapshot.shards(), + routingTable); + if (shards != null) { + changed = true; + if (!snapshot.state().completed() && completed(shards.values())) { + updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, State.SUCCESS, shards); + endSnapshot(updatedSnapshot); + } else { + updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, shards); + } + } + entries.add(updatedSnapshot); + } + } + if (changed) { + snapshots = new SnapshotsInProgress(entries.toArray(new SnapshotsInProgress.Entry[entries.size()])); + return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build(); + } + } + return currentState; + } + + @Override + public void onFailure(String source, Exception e) { + logger.warn(() -> + new ParameterizedMessage("failed to update snapshot state after shards started from [{}] ", source), e); + } + }); + } + } + + private ImmutableOpenMap processWaitingShards( ImmutableOpenMap snapshotShards, RoutingTable routingTable) { boolean snapshotChanged = false; ImmutableOpenMap.Builder shards = ImmutableOpenMap.builder(); @@ -904,16 +905,19 @@ private static ImmutableOpenMap processWaitingShar } } - private static boolean waitingShardsStartedOrUnassigned(SnapshotsInProgress snapshotsInProgress, ClusterChangedEvent event) { - for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) { - if (entry.state() == State.STARTED) { - for (ObjectCursor index : entry.waitingIndices().keys()) { - if (event.indexRoutingTableChanged(index.value)) { - IndexRoutingTable indexShardRoutingTable = event.state().getRoutingTable().index(index.value); - for (ShardId shardId : entry.waitingIndices().get(index.value)) { - ShardRouting shardRouting = indexShardRoutingTable.shard(shardId.id()).primaryShard(); - if (shardRouting != null && (shardRouting.started() || shardRouting.unassigned())) { - return true; + private boolean waitingShardsStartedOrUnassigned(ClusterChangedEvent event) { + SnapshotsInProgress curr = event.state().custom(SnapshotsInProgress.TYPE); + if (curr != null) { + for (SnapshotsInProgress.Entry entry : curr.entries()) { + if (entry.state() == State.STARTED && !entry.waitingIndices().isEmpty()) { + for (ObjectCursor index : entry.waitingIndices().keys()) { + if (event.indexRoutingTableChanged(index.value)) { + IndexRoutingTable indexShardRoutingTable = event.state().getRoutingTable().index(index.value); + for (ShardId shardId : entry.waitingIndices().get(index.value)) { + ShardRouting shardRouting = indexShardRoutingTable.shard(shardId.id()).primaryShard(); + if (shardRouting != null && (shardRouting.started() || shardRouting.unassigned())) { + return true; + } } } } @@ -923,12 +927,28 @@ private static boolean waitingShardsStartedOrUnassigned(SnapshotsInProgress snap return false; } - private static boolean removedNodesCleanupNeeded(SnapshotsInProgress snapshotsInProgress, List removedNodes) { - // If at least one shard was running on a removed node - we need to fail it - return removedNodes.isEmpty() == false && snapshotsInProgress.entries().stream().flatMap(snapshot -> - StreamSupport.stream(((Iterable) () -> snapshot.shards().valuesIt()).spliterator(), false) - .filter(s -> s.state().completed() == false).map(ShardSnapshotStatus::nodeId)) - .anyMatch(removedNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toSet())::contains); + private boolean removedNodesCleanupNeeded(ClusterChangedEvent event) { + SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE); + if (snapshotsInProgress == null) { + return false; + } + // Check if we just became the master + boolean newMaster = !event.previousState().nodes().isLocalNodeElectedMaster(); + for (SnapshotsInProgress.Entry snapshot : snapshotsInProgress.entries()) { + if (newMaster && (snapshot.state() == State.SUCCESS || snapshot.state() == State.INIT)) { + // We just replaced old master and snapshots in intermediate states needs to be cleaned + return true; + } + for (DiscoveryNode node : event.nodesDelta().removedNodes()) { + for (ObjectCursor shardStatus : snapshot.shards().values()) { + if (!shardStatus.value.state().completed() && node.getId().equals(shardStatus.value.nodeId())) { + // At least one shard was running on the removed node - we need to fail it + return true; + } + } + } + } + return false; } /** @@ -961,16 +981,25 @@ private Tuple, Set> indicesWithMissingShards( * * @param entry snapshot */ - private void endSnapshot(final SnapshotsInProgress.Entry entry) { - if (endingSnapshots.add(entry.snapshot()) == false) { - return; - } + void endSnapshot(final SnapshotsInProgress.Entry entry) { + endSnapshot(entry, null); + } + + + /** + * Finalizes the shard in repository and then removes it from cluster state + *

    + * This is non-blocking method that runs on a thread from SNAPSHOT thread pool + * + * @param entry snapshot + * @param failure failure reason or null if snapshot was successful + */ + private void endSnapshot(final SnapshotsInProgress.Entry entry, final String failure) { threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() { @Override protected void doRun() { final Snapshot snapshot = entry.snapshot(); final Repository repository = repositoriesService.repository(snapshot.getRepository()); - final String failure = entry.failure(); logger.trace("[{}] finalizing snapshot in repository, state: [{}], failure[{}]", snapshot, entry.state(), failure); ArrayList shardFailures = new ArrayList<>(); for (ObjectObjectCursor shardStatus : entry.shards()) { @@ -986,7 +1015,7 @@ protected void doRun() { entry.startTime(), failure, entry.shards().size(), - unmodifiableList(shardFailures), + Collections.unmodifiableList(shardFailures), entry.getRepositoryStateId(), entry.includeGlobalState()); removeSnapshotFromClusterState(snapshot, snapshotInfo, null); @@ -1004,7 +1033,7 @@ public void onFailure(final Exception e) { /** * Removes record of running snapshot from cluster state - * @param snapshot snapshot + * @param snapshot snapshot * @param snapshotInfo snapshot info if snapshot was successful * @param e exception if snapshot failed */ @@ -1014,11 +1043,11 @@ private void removeSnapshotFromClusterState(final Snapshot snapshot, final Snaps /** * Removes record of running snapshot from cluster state and notifies the listener when this action is complete - * @param snapshot snapshot + * @param snapshot snapshot * @param failure exception if snapshot failed * @param listener listener to notify when snapshot information is removed from the cluster state */ - private void removeSnapshotFromClusterState(final Snapshot snapshot, @Nullable SnapshotInfo snapshotInfo, final Exception failure, + private void removeSnapshotFromClusterState(final Snapshot snapshot, final SnapshotInfo snapshotInfo, final Exception failure, @Nullable CleanupAfterErrorListener listener) { clusterService.submitStateUpdateTask("remove snapshot metadata", new ClusterStateUpdateTask() { @@ -1036,8 +1065,8 @@ public ClusterState execute(ClusterState currentState) { } } if (changed) { - return ClusterState.builder(currentState) - .putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(unmodifiableList(entries))).build(); + snapshots = new SnapshotsInProgress(entries.toArray(new SnapshotsInProgress.Entry[entries.size()])); + return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build(); } } return currentState; @@ -1046,7 +1075,6 @@ public ClusterState execute(ClusterState currentState) { @Override public void onFailure(String source, Exception e) { logger.warn(() -> new ParameterizedMessage("[{}] failed to remove snapshot metadata", snapshot), e); - endingSnapshots.remove(snapshot); if (listener != null) { listener.onFailure(e); } @@ -1054,9 +1082,8 @@ public void onFailure(String source, Exception e) { @Override public void onNoLongerMaster(String source) { - endingSnapshots.remove(snapshot); if (listener != null) { - listener.onNoLongerMaster(); + listener.onNoLongerMaster(source); } } @@ -1074,7 +1101,6 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS logger.warn("Failed to notify listeners", e); } } - endingSnapshots.remove(snapshot); if (listener != null) { listener.onResponse(snapshotInfo); } @@ -1105,20 +1131,14 @@ public void deleteSnapshot(final String repositoryName, final String snapshotNam .filter(s -> s.getName().equals(snapshotName)) .findFirst(); // if nothing found by the same name, then look in the cluster state for current in progress snapshots - long repoGenId = repositoryData.getGenId(); if (matchedEntry.isPresent() == false) { - Optional matchedInProgress = currentSnapshots(repositoryName, Collections.emptyList()).stream() - .filter(s -> s.snapshot().getSnapshotId().getName().equals(snapshotName)).findFirst(); - if (matchedInProgress.isPresent()) { - matchedEntry = matchedInProgress.map(s -> s.snapshot().getSnapshotId()); - // Derive repository generation if a snapshot is in progress because it will increment the generation when it finishes - repoGenId = matchedInProgress.get().getRepositoryStateId() + 1L; - } + matchedEntry = currentSnapshots(repositoryName, Collections.emptyList()).stream() + .map(e -> e.snapshot().getSnapshotId()).filter(s -> s.getName().equals(snapshotName)).findFirst(); } if (matchedEntry.isPresent() == false) { throw new SnapshotMissingException(repositoryName, snapshotName); } - deleteSnapshot(new Snapshot(repositoryName, matchedEntry.get()), listener, repoGenId, immediatePriority); + deleteSnapshot(new Snapshot(repositoryName, matchedEntry.get()), listener, repositoryData.getGenId(), immediatePriority); } /** @@ -1181,12 +1201,10 @@ public ClusterState execute(ClusterState currentState) throws Exception { final ImmutableOpenMap shards; final State state = snapshotEntry.state(); - final String failure; if (state == State.INIT) { // snapshot is still initializing, mark it as aborted shards = snapshotEntry.shards(); - assert shards.isEmpty(); - failure = "Snapshot was aborted during initialization"; + } else if (state == State.STARTED) { // snapshot is started - mark every non completed shard as aborted final ImmutableOpenMap.Builder shardsBuilder = ImmutableOpenMap.builder(); @@ -1198,7 +1216,7 @@ public ClusterState execute(ClusterState currentState) throws Exception { shardsBuilder.put(shardEntry.key, status); } shards = shardsBuilder.build(); - failure = "Snapshot was aborted by deletion"; + } else { boolean hasUncompletedShards = false; // Cleanup in case a node gone missing and snapshot wasn't updated for some reason @@ -1219,10 +1237,10 @@ public ClusterState execute(ClusterState currentState) throws Exception { // where we force to finish the snapshot logger.debug("trying to delete completed snapshot with no finalizing shards - can delete immediately"); shards = snapshotEntry.shards(); + endSnapshot(snapshotEntry); } - failure = snapshotEntry.failure(); } - SnapshotsInProgress.Entry newSnapshot = new SnapshotsInProgress.Entry(snapshotEntry, State.ABORTED, shards, failure); + SnapshotsInProgress.Entry newSnapshot = new SnapshotsInProgress.Entry(snapshotEntry, State.ABORTED, shards); clusterStateBuilder.putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(newSnapshot)); } return clusterStateBuilder.build(); @@ -1373,8 +1391,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS * @param indices list of indices to be snapshotted * @return list of shard to be included into current snapshot */ - private static ImmutableOpenMap shards(ClusterState clusterState, - List indices) { + private ImmutableOpenMap shards(ClusterState clusterState, List indices) { ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(); MetaData metaData = clusterState.metaData(); for (IndexId index : indices) { @@ -1399,6 +1416,8 @@ private static ImmutableOpenMap