From 483386136d97823f48321d2b122b564aa4d0b3b6 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Sun, 12 Jul 2020 22:19:07 +0200 Subject: [PATCH] Move all Snapshot Master Node Steps to SnapshotsService (#56365) (#59373) This refactoring has three motivations: 1. Separate all master node steps during snapshot operations from all data node steps in code. 2. Set up next steps in concurrent repository operations and general improvements by centralizing tracking of each shard's state in the repository in `SnapshotsService` so that operations for each shard can be linearized efficiently (i.e. without having to inspect the full snapshot state for all shards on every cluster state update, allowing us to track more in memory and only fall back to inspecting the full CS on master failover like we do in the snapshot shards service). * This PR already contains some best effort examples of this, but obviously this could be way improved upon still (just did not want to do it in this PR for complexity reasons) 3. Make the `SnapshotsService` less expensive on the CS thread for large snapshots --- .../java/org/elasticsearch/node/Node.java | 5 +- .../snapshots/SnapshotShardsService.java | 230 +----------------- .../snapshots/SnapshotsService.java | 216 +++++++++++++--- ...UpdateIndexShardSnapshotStatusRequest.java | 101 ++++++++ ...pdateIndexShardSnapshotStatusResponse.java | 37 +++ .../elasticsearch/snapshots/package-info.java | 2 +- .../snapshots/SnapshotResiliencyTests.java | 11 +- .../snapshots/SnapshotShardsServiceTests.java | 6 +- 8 files changed, 342 insertions(+), 266 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/snapshots/UpdateIndexShardSnapshotStatusRequest.java create mode 100644 server/src/main/java/org/elasticsearch/snapshots/UpdateIndexShardSnapshotStatusResponse.java diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 2f5ea4581b5a3..e23fdc32031f4 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -555,10 +555,9 @@ protected Node(final Environment initialEnvironment, RepositoriesService repositoryService = repositoriesModule.getRepositoryService(); repositoriesServiceReference.set(repositoryService); SnapshotsService snapshotsService = new SnapshotsService(settings, clusterService, - clusterModule.getIndexNameExpressionResolver(), repositoryService, threadPool); + clusterModule.getIndexNameExpressionResolver(), repositoryService, transportService, actionModule.getActionFilters()); SnapshotShardsService snapshotShardsService = new SnapshotShardsService(settings, clusterService, repositoryService, - threadPool, transportService, indicesService, actionModule.getActionFilters(), - clusterModule.getIndexNameExpressionResolver()); + transportService, indicesService); TransportNodesSnapshotsStatus nodesSnapshotsStatus = new TransportNodesSnapshotsStatus(threadPool, clusterService, transportService, snapshotShardsService, actionModule.getActionFilters()); RestoreService restoreService = new RestoreService(clusterService, repositoryService, clusterModule.getAllocationService(), diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 89c4e553a8058..ae58101949e35 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -26,33 +26,19 @@ import org.apache.lucene.index.IndexCommit; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.master.MasterNodeRequest; -import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; -import org.elasticsearch.cluster.ClusterStateTaskConfig; -import org.elasticsearch.cluster.ClusterStateTaskExecutor; -import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus; import org.elasticsearch.cluster.SnapshotsInProgress.ShardState; import org.elasticsearch.cluster.SnapshotsInProgress.State; -import org.elasticsearch.cluster.block.ClusterBlockException; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.Priority; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.seqno.SequenceNumbers; @@ -74,22 +60,18 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; -import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.function.Function; import java.util.stream.Collectors; import static java.util.Collections.emptyMap; -import static java.util.Collections.unmodifiableList; -import static org.elasticsearch.cluster.SnapshotsInProgress.completed; /** - * This service runs on data and master nodes and controls currently snapshotted shards on these nodes. It is responsible for - * starting and stopping shard level snapshots + * This service runs on data nodes and controls currently running shard snapshots on these nodes. It is responsible for + * starting and stopping shard level snapshots. + * See package level documentation of {@link org.elasticsearch.snapshots} for details. */ public class SnapshotShardsService extends AbstractLifecycleComponent implements ClusterStateListener, IndexEventListener { private static final Logger logger = LogManager.getLogger(SnapshotShardsService.class); @@ -110,31 +92,21 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements private final TransportRequestDeduplicator remoteFailedRequestDeduplicator = new TransportRequestDeduplicator<>(); - private final SnapshotStateExecutor snapshotStateExecutor = new SnapshotStateExecutor(); - private final UpdateSnapshotStatusAction updateSnapshotStatusHandler; - public SnapshotShardsService(Settings settings, ClusterService clusterService, RepositoriesService repositoriesService, - ThreadPool threadPool, TransportService transportService, IndicesService indicesService, - ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + TransportService transportService, IndicesService indicesService) { this.indicesService = indicesService; this.repositoriesService = repositoriesService; this.transportService = transportService; this.clusterService = clusterService; - this.threadPool = threadPool; + this.threadPool = transportService.getThreadPool(); if (DiscoveryNode.isDataNode(settings)) { // this is only useful on the nodes that can hold data clusterService.addListener(this); } - - // The constructor of UpdateSnapshotStatusAction will register itself to the TransportService. - this.updateSnapshotStatusHandler = - new UpdateSnapshotStatusAction(transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver); } @Override protected void doStart() { - assert this.updateSnapshotStatusHandler != null; - assert transportService.getRequestHandler(SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME) != null; } @Override @@ -444,77 +416,6 @@ private void syncShardStatsOnNewMaster(ClusterChangedEvent event) { } } - /** - * Internal request that is used to send changes in snapshot status to master - */ - public static class UpdateIndexShardSnapshotStatusRequest extends MasterNodeRequest { - private final Snapshot snapshot; - private final ShardId shardId; - private final ShardSnapshotStatus status; - - public UpdateIndexShardSnapshotStatusRequest(StreamInput in) throws IOException { - super(in); - snapshot = new Snapshot(in); - shardId = new ShardId(in); - status = new ShardSnapshotStatus(in); - } - - public UpdateIndexShardSnapshotStatusRequest(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus status) { - this.snapshot = snapshot; - this.shardId = shardId; - this.status = status; - // By default, we keep trying to post snapshot status messages to avoid snapshot processes getting stuck. - this.masterNodeTimeout = TimeValue.timeValueNanos(Long.MAX_VALUE); - } - - @Override - public ActionRequestValidationException validate() { - return null; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - snapshot.writeTo(out); - shardId.writeTo(out); - status.writeTo(out); - } - - public Snapshot snapshot() { - return snapshot; - } - - public ShardId shardId() { - return shardId; - } - - public ShardSnapshotStatus status() { - return status; - } - - @Override - public String toString() { - return snapshot + ", shardId [" + shardId + "], status [" + status.state() + "]"; - } - - @Override - public boolean equals(final Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - final UpdateIndexShardSnapshotStatusRequest that = (UpdateIndexShardSnapshotStatusRequest) o; - return snapshot.equals(that.snapshot) && shardId.equals(that.shardId) && status.equals(that.status); - } - - @Override - public int hashCode() { - return Objects.hash(snapshot, shardId, status); - } - } - /** Notify the master node that the given shard has been successfully snapshotted **/ private void notifySuccessfulSnapshotShard(final Snapshot snapshot, final ShardId shardId, String generation) { assert generation != null; @@ -569,125 +470,4 @@ public String executor() { }) ); } - - /** - * Updates the shard status on master node - * - * @param request update shard status request - */ - private void innerUpdateSnapshotState(final UpdateIndexShardSnapshotStatusRequest request, - ActionListener listener) { - logger.trace("received updated snapshot restore state [{}]", request); - clusterService.submitStateUpdateTask( - "update snapshot state", - request, - ClusterStateTaskConfig.build(Priority.NORMAL), - snapshotStateExecutor, - new ClusterStateTaskListener() { - @Override - public void onFailure(String source, Exception e) { - listener.onFailure(e); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - listener.onResponse(new UpdateIndexShardSnapshotStatusResponse()); - } - }); - } - - private static class SnapshotStateExecutor implements ClusterStateTaskExecutor { - - @Override - public ClusterTasksResult - execute(ClusterState currentState, List tasks) { - final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); - if (snapshots != null) { - int changedCount = 0; - final List entries = new ArrayList<>(); - for (SnapshotsInProgress.Entry entry : snapshots.entries()) { - ImmutableOpenMap.Builder shards = ImmutableOpenMap.builder(); - boolean updated = false; - - for (UpdateIndexShardSnapshotStatusRequest updateSnapshotState : tasks) { - if (entry.snapshot().equals(updateSnapshotState.snapshot())) { - logger.trace("[{}] Updating shard [{}] with status [{}]", updateSnapshotState.snapshot(), - updateSnapshotState.shardId(), updateSnapshotState.status().state()); - if (updated == false) { - shards.putAll(entry.shards()); - updated = true; - } - shards.put(updateSnapshotState.shardId(), updateSnapshotState.status()); - changedCount++; - } - } - - if (updated) { - if (completed(shards.values()) == false) { - entries.add(new SnapshotsInProgress.Entry(entry, shards.build())); - } else { - // Snapshot is finished - mark it as done - // TODO: Add PARTIAL_SUCCESS status? - SnapshotsInProgress.Entry updatedEntry = new SnapshotsInProgress.Entry(entry, State.SUCCESS, shards.build()); - entries.add(updatedEntry); - } - } else { - entries.add(entry); - } - } - if (changedCount > 0) { - logger.trace("changed cluster state triggered by {} snapshot state updates", changedCount); - return ClusterTasksResult.builder().successes(tasks) - .build(ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, - SnapshotsInProgress.of(unmodifiableList(entries))).build()); - } - } - return ClusterTasksResult.builder().successes(tasks).build(currentState); - } - } - - static class UpdateIndexShardSnapshotStatusResponse extends ActionResponse { - - UpdateIndexShardSnapshotStatusResponse() {} - - UpdateIndexShardSnapshotStatusResponse(StreamInput in) throws IOException { - super(in); - } - - @Override - public void writeTo(StreamOutput out) throws IOException {} - } - - private class UpdateSnapshotStatusAction - extends TransportMasterNodeAction { - UpdateSnapshotStatusAction(TransportService transportService, ClusterService clusterService, - ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { - super( - SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME, false, transportService, clusterService, threadPool, - actionFilters, UpdateIndexShardSnapshotStatusRequest::new, indexNameExpressionResolver - ); - } - - @Override - protected String executor() { - return ThreadPool.Names.SAME; - } - - @Override - protected UpdateIndexShardSnapshotStatusResponse read(StreamInput in) throws IOException { - return new UpdateIndexShardSnapshotStatusResponse(in); - } - - @Override - protected void masterOperation(UpdateIndexShardSnapshotStatusRequest request, ClusterState state, - ActionListener listener) { - innerUpdateSnapshotState(request, listener); - } - - @Override - protected ClusterBlockException checkBlock(UpdateIndexShardSnapshotStatusRequest request, ClusterState state) { - return null; - } - } - } diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index f95ba972ee81a..676fc83c340e0 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -31,9 +31,14 @@ import org.elasticsearch.action.StepListener; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest; import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateApplier; +import org.elasticsearch.cluster.ClusterStateTaskConfig; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; +import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.RepositoryCleanupInProgress; @@ -43,6 +48,7 @@ import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus; import org.elasticsearch.cluster.SnapshotsInProgress.ShardState; import org.elasticsearch.cluster.SnapshotsInProgress.State; +import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexMetadata; @@ -63,6 +69,7 @@ import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -77,7 +84,9 @@ import org.elasticsearch.repositories.RepositoryMissingException; import org.elasticsearch.repositories.ShardGenerations; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -100,8 +109,9 @@ import static org.elasticsearch.cluster.SnapshotsInProgress.completed; /** - * Service responsible for creating snapshots. See package level documentation of {@link org.elasticsearch.snapshots} - * for details. + * Service responsible for creating snapshots. This service runs all the steps executed on the master node during snapshot creation and + * deletion. + * See package level documentation of {@link org.elasticsearch.snapshots} for details. */ public class SnapshotsService extends AbstractLifecycleComponent implements ClusterStateApplier { @@ -138,13 +148,22 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus // Set of snapshots that are currently being ended by this node private final Set endingSnapshots = Collections.synchronizedSet(new HashSet<>()); + private final SnapshotStateExecutor snapshotStateExecutor = new SnapshotStateExecutor(); + private final UpdateSnapshotStatusAction updateSnapshotStatusHandler; + + private final TransportService transportService; + public SnapshotsService(Settings settings, ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver, - RepositoriesService repositoriesService, ThreadPool threadPool) { + RepositoriesService repositoriesService, TransportService transportService, ActionFilters actionFilters) { this.clusterService = clusterService; this.indexNameExpressionResolver = indexNameExpressionResolver; this.repositoriesService = repositoriesService; - this.threadPool = threadPool; + this.threadPool = transportService.getThreadPool(); + this.transportService = transportService; + // The constructor of UpdateSnapshotStatusAction will register itself to the TransportService. + this.updateSnapshotStatusHandler = new UpdateSnapshotStatusAction( + transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver); if (DiscoveryNode.isMasterNode(settings)) { // addLowPriorityApplier to make sure that Repository will be created before snapshot clusterService.addLowPriorityApplier(this); @@ -181,10 +200,10 @@ public void createSnapshot(final CreateSnapshotRequest request, final ActionList final Map userMeta = repository.adaptUserMetadata(request.userMetadata()); clusterService.submitStateUpdateTask("create_snapshot [" + snapshotName + ']', new ClusterStateUpdateTask() { - private SnapshotsInProgress.Entry newSnapshot = null; - private List indices; + private SnapshotsInProgress.Entry newEntry; + @Override public ClusterState execute(ClusterState currentState) { validate(repositoryName, snapshotName, currentState); @@ -213,7 +232,7 @@ public ClusterState execute(ClusterState currentState) { indexNameExpressionResolver.dataStreamNames(currentState, request.indicesOptions(), request.indices()); logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices); - newSnapshot = new SnapshotsInProgress.Entry( + newEntry = new SnapshotsInProgress.Entry( new Snapshot(repositoryName, snapshotId), request.includeGlobalState(), request.partial(), State.INIT, @@ -224,28 +243,28 @@ public ClusterState execute(ClusterState currentState) { null, userMeta, Version.CURRENT ); - initializingSnapshots.add(newSnapshot.snapshot()); - snapshots = SnapshotsInProgress.of(Collections.singletonList(newSnapshot)); + initializingSnapshots.add(newEntry.snapshot()); + snapshots = SnapshotsInProgress.of(Collections.singletonList(newEntry)); return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build(); } @Override public void onFailure(String source, Exception e) { logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to create snapshot", repositoryName, snapshotName), e); - if (newSnapshot != null) { - initializingSnapshots.remove(newSnapshot.snapshot()); + if (newEntry != null) { + initializingSnapshots.remove(newEntry.snapshot()); } - newSnapshot = null; + newEntry = null; listener.onFailure(e); } @Override public void clusterStateProcessed(String source, ClusterState oldState, final ClusterState newState) { - if (newSnapshot != null) { - final Snapshot current = newSnapshot.snapshot(); + if (newEntry != null) { + final Snapshot current = newEntry.snapshot(); assert initializingSnapshots.contains(current); assert indices != null; - beginSnapshot(newState, newSnapshot, request.partial(), indices, repository, new ActionListener() { + beginSnapshot(newState, newEntry, request.partial(), indices, repository, new ActionListener() { @Override public void onResponse(final Snapshot snapshot) { initializingSnapshots.remove(snapshot); @@ -447,6 +466,8 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS final SnapshotsInProgress.Entry entry = snapshotsInProgress.snapshot(snapshot.snapshot()); assert entry != null; endSnapshot(entry, newState.metadata()); + } else { + endCompletedSnapshots(newState); } } }); @@ -600,15 +621,9 @@ public void applyClusterState(ClusterChangedEvent event) { if (event.routingTableChanged() && waitingShardsStartedOrUnassigned(snapshotsInProgress, event)) { processStartedShards(); } - // Cleanup all snapshots that have no more work left: - // 1. Completed snapshots - // 2. Snapshots in state INIT that the previous master failed to start - // 3. Snapshots in any other state that have all their shard tasks completed - snapshotsInProgress.entries().stream().filter( - entry -> entry.state().completed() - || initializingSnapshots.contains(entry.snapshot()) == false - && (entry.state() == State.INIT || completed(entry.shards().values())) - ).forEach(entry -> endSnapshot(entry, event.state().metadata())); + if (newMaster) { + endCompletedSnapshots(event.state()); + } } if (newMaster) { finalizeSnapshotDeletionFromPreviousMaster(event.state()); @@ -630,6 +645,20 @@ public void applyClusterState(ClusterChangedEvent event) { assert assertConsistentWithClusterState(event.state()); } + /** + * Cleanup all snapshots found in the given cluster state that have no more work left: + * 1. Completed snapshots + * 2. Snapshots in state INIT that a previous master of an older version failed to start + * 3. Snapshots in any other state that have all their shard tasks completed + */ + private void endCompletedSnapshots(ClusterState state) { + SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE); + assert snapshotsInProgress != null; + snapshotsInProgress.entries().stream().filter( + entry -> entry.state().completed() || entry.state() == State.INIT || completed(entry.shards().values()) + ).forEach(entry -> endSnapshot(entry, state.metadata())); + } + private boolean assertConsistentWithClusterState(ClusterState state) { final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE); if (snapshotsInProgress != null && snapshotsInProgress.entries().isEmpty() == false) { @@ -672,6 +701,9 @@ private void finalizeSnapshotDeletionFromPreviousMaster(ClusterState state) { */ private void processSnapshotsOnRemovedNodes() { clusterService.submitStateUpdateTask("update snapshot state after node removal", new ClusterStateUpdateTask() { + + private boolean changed = false; + @Override public ClusterState execute(ClusterState currentState) { DiscoveryNodes nodes = currentState.nodes(); @@ -679,7 +711,6 @@ public ClusterState execute(ClusterState currentState) { if (snapshots == null) { return currentState; } - boolean changed = false; ArrayList entries = new ArrayList<>(); for (final SnapshotsInProgress.Entry snapshot : snapshots.entries()) { SnapshotsInProgress.Entry updatedSnapshot = snapshot; @@ -734,17 +765,26 @@ public ClusterState execute(ClusterState currentState) { public void onFailure(String source, Exception e) { logger.warn("failed to update snapshot state after node removal"); } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + if (changed) { + endCompletedSnapshots(newState); + } + } }); } private void processStartedShards() { clusterService.submitStateUpdateTask("update snapshot state after shards started", new ClusterStateUpdateTask() { + + private boolean changed = false; + @Override public ClusterState execute(ClusterState currentState) { RoutingTable routingTable = currentState.routingTable(); SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); if (snapshots != null) { - boolean changed = false; ArrayList entries = new ArrayList<>(); for (final SnapshotsInProgress.Entry snapshot : snapshots.entries()) { SnapshotsInProgress.Entry updatedSnapshot = snapshot; @@ -764,7 +804,7 @@ public ClusterState execute(ClusterState currentState) { } if (changed) { return ClusterState.builder(currentState) - .putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(unmodifiableList(entries))).build(); + .putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(entries)).build(); } } return currentState; @@ -775,6 +815,13 @@ public void onFailure(String source, Exception e) { logger.warn(() -> new ParameterizedMessage("failed to update snapshot state after shards started from [{}] ", source), e); } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + if (changed) { + endCompletedSnapshots(newState); + } + } }); } @@ -1559,7 +1606,8 @@ private void addListener(Snapshot snapshot, ActionListener { + + @Override + public ClusterTasksResult + execute(ClusterState currentState, List tasks) { + final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); + if (snapshots != null) { + int changedCount = 0; + final List entries = new ArrayList<>(); + for (SnapshotsInProgress.Entry entry : snapshots.entries()) { + ImmutableOpenMap.Builder shards = ImmutableOpenMap.builder(); + boolean updated = false; + + for (UpdateIndexShardSnapshotStatusRequest updateSnapshotState : tasks) { + if (entry.snapshot().equals(updateSnapshotState.snapshot())) { + logger.trace("[{}] Updating shard [{}] with status [{}]", updateSnapshotState.snapshot(), + updateSnapshotState.shardId(), updateSnapshotState.status().state()); + if (updated == false) { + shards.putAll(entry.shards()); + updated = true; + } + shards.put(updateSnapshotState.shardId(), updateSnapshotState.status()); + changedCount++; + } + } + + if (updated) { + if (completed(shards.values()) == false) { + entries.add(new SnapshotsInProgress.Entry(entry, shards.build())); + } else { + // Snapshot is finished - mark it as done + // TODO: Add PARTIAL_SUCCESS status? + SnapshotsInProgress.Entry updatedEntry = new SnapshotsInProgress.Entry(entry, State.SUCCESS, shards.build()); + entries.add(updatedEntry); + } + } else { + entries.add(entry); + } + } + if (changedCount > 0) { + logger.trace("changed cluster state triggered by {} snapshot state updates", changedCount); + return ClusterTasksResult.builder().successes(tasks) + .build(ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, + SnapshotsInProgress.of(entries)).build()); + } + } + return ClusterTasksResult.builder().successes(tasks).build(currentState); + } + } + + /** + * Updates the shard status on master node + * + * @param request update shard status request + */ + private void innerUpdateSnapshotState(final UpdateIndexShardSnapshotStatusRequest request, + ActionListener listener) { + logger.trace("received updated snapshot restore state [{}]", request); + clusterService.submitStateUpdateTask( + "update snapshot state", + request, + ClusterStateTaskConfig.build(Priority.NORMAL), + snapshotStateExecutor, + new ClusterStateTaskListener() { + @Override + public void onFailure(String source, Exception e) { + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + try { + listener.onResponse(new UpdateIndexShardSnapshotStatusResponse()); + } finally { + endCompletedSnapshots(newState); + } + } + }); + } + + private class UpdateSnapshotStatusAction + extends TransportMasterNodeAction { + UpdateSnapshotStatusAction(TransportService transportService, ClusterService clusterService, + ThreadPool threadPool, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver) { + super(UPDATE_SNAPSHOT_STATUS_ACTION_NAME, false, transportService, clusterService, threadPool, + actionFilters, UpdateIndexShardSnapshotStatusRequest::new, indexNameExpressionResolver + ); + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected UpdateIndexShardSnapshotStatusResponse read(StreamInput in) throws IOException { + return new UpdateIndexShardSnapshotStatusResponse(in); + } + + @Override + protected void masterOperation(UpdateIndexShardSnapshotStatusRequest request, ClusterState state, + ActionListener listener) throws Exception { + innerUpdateSnapshotState(request, listener); + } + + @Override + protected ClusterBlockException checkBlock(UpdateIndexShardSnapshotStatusRequest request, ClusterState state) { + return null; + } + } } diff --git a/server/src/main/java/org/elasticsearch/snapshots/UpdateIndexShardSnapshotStatusRequest.java b/server/src/main/java/org/elasticsearch/snapshots/UpdateIndexShardSnapshotStatusRequest.java new file mode 100644 index 0000000000000..1d20d2d109b7d --- /dev/null +++ b/server/src/main/java/org/elasticsearch/snapshots/UpdateIndexShardSnapshotStatusRequest.java @@ -0,0 +1,101 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.snapshots; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.master.MasterNodeRequest; +import org.elasticsearch.cluster.SnapshotsInProgress; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.shard.ShardId; + +import java.io.IOException; +import java.util.Objects; + +/** + * Internal request that is used to send changes in snapshot status to master + */ +public class UpdateIndexShardSnapshotStatusRequest extends MasterNodeRequest { + private final Snapshot snapshot; + private final ShardId shardId; + private final SnapshotsInProgress.ShardSnapshotStatus status; + + public UpdateIndexShardSnapshotStatusRequest(StreamInput in) throws IOException { + super(in); + snapshot = new Snapshot(in); + shardId = new ShardId(in); + status = new SnapshotsInProgress.ShardSnapshotStatus(in); + } + + public UpdateIndexShardSnapshotStatusRequest(Snapshot snapshot, ShardId shardId, SnapshotsInProgress.ShardSnapshotStatus status) { + this.snapshot = snapshot; + this.shardId = shardId; + this.status = status; + // By default, we keep trying to post snapshot status messages to avoid snapshot processes getting stuck. + this.masterNodeTimeout = TimeValue.timeValueNanos(Long.MAX_VALUE); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + snapshot.writeTo(out); + shardId.writeTo(out); + status.writeTo(out); + } + + public Snapshot snapshot() { + return snapshot; + } + + public ShardId shardId() { + return shardId; + } + + public SnapshotsInProgress.ShardSnapshotStatus status() { + return status; + } + + @Override + public String toString() { + return snapshot + ", shardId [" + shardId + "], status [" + status.state() + "]"; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final UpdateIndexShardSnapshotStatusRequest that = (UpdateIndexShardSnapshotStatusRequest) o; + return snapshot.equals(that.snapshot) && shardId.equals(that.shardId) && status.equals(that.status); + } + + @Override + public int hashCode() { + return Objects.hash(snapshot, shardId, status); + } +} diff --git a/server/src/main/java/org/elasticsearch/snapshots/UpdateIndexShardSnapshotStatusResponse.java b/server/src/main/java/org/elasticsearch/snapshots/UpdateIndexShardSnapshotStatusResponse.java new file mode 100644 index 0000000000000..6ee95f87498f6 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/snapshots/UpdateIndexShardSnapshotStatusResponse.java @@ -0,0 +1,37 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.snapshots; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +class UpdateIndexShardSnapshotStatusResponse extends ActionResponse { + + UpdateIndexShardSnapshotStatusResponse() {} + + UpdateIndexShardSnapshotStatusResponse(StreamInput in) throws IOException { + super(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException {} +} diff --git a/server/src/main/java/org/elasticsearch/snapshots/package-info.java b/server/src/main/java/org/elasticsearch/snapshots/package-info.java index d4a31ad41352c..c651b74e69d46 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/package-info.java +++ b/server/src/main/java/org/elasticsearch/snapshots/package-info.java @@ -28,7 +28,7 @@ * {@link org.elasticsearch.cluster.SnapshotsInProgress}. All nodes consume the state of the {@code SnapshotsInProgress} and will start or * abort relevant shard snapshot tasks accordingly. *
  • Nodes that are executing shard snapshot tasks report either success or failure of their snapshot task by submitting a - * {@link org.elasticsearch.snapshots.SnapshotShardsService.UpdateIndexShardSnapshotStatusRequest} to the master node that will update the + * {@link org.elasticsearch.snapshots.UpdateIndexShardSnapshotStatusRequest} to the master node that will update the * snapshot's entry in the cluster state accordingly.
  • * * diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 5ca8fec8451f6..6c09a997cbd15 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -1419,8 +1419,9 @@ public void onFailure(final Exception e) { settings, clusterService, transportService, Collections.singletonMap(FsRepository.TYPE, getRepoFactory(environment)), emptyMap(), threadPool ); - snapshotsService = - new SnapshotsService(settings, clusterService, indexNameExpressionResolver, repositoriesService, threadPool); + final ActionFilters actionFilters = new ActionFilters(emptySet()); + snapshotsService = new SnapshotsService(settings, clusterService, indexNameExpressionResolver, repositoriesService, + transportService, actionFilters); nodeEnv = new NodeEnvironment(settings, environment); final NamedXContentRegistry namedXContentRegistry = new NamedXContentRegistry(Collections.emptyList()); final ScriptService scriptService = new ScriptService(settings, emptyMap(), emptyMap()); @@ -1453,10 +1454,8 @@ public void onFailure(final Exception e) { null ); final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); - final ActionFilters actionFilters = new ActionFilters(emptySet()); - snapshotShardsService = new SnapshotShardsService( - settings, clusterService, repositoriesService, threadPool, - transportService, indicesService, actionFilters, indexNameExpressionResolver); + snapshotShardsService = + new SnapshotShardsService(settings, clusterService, repositoriesService, transportService, indicesService); final ShardStateAction shardStateAction = new ShardStateAction( clusterService, transportService, allocationService, new BatchedRerouteService(clusterService, allocationService::reroute), diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotShardsServiceTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotShardsServiceTests.java index 5a513d1603680..564ee3aecacb9 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotShardsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotShardsServiceTests.java @@ -44,18 +44,18 @@ public void testSummarizeFailure() { public void testEqualsAndHashcodeUpdateIndexShardSnapshotStatusRequest() { EqualsHashCodeTestUtils.checkEqualsAndHashCode( - new SnapshotShardsService.UpdateIndexShardSnapshotStatusRequest( + new UpdateIndexShardSnapshotStatusRequest( new Snapshot(randomAlphaOfLength(10), new SnapshotId(randomAlphaOfLength(10), UUIDs.randomBase64UUID(random()))), new ShardId(randomAlphaOfLength(10), UUIDs.randomBase64UUID(random()), randomInt(5)), new SnapshotsInProgress.ShardSnapshotStatus(randomAlphaOfLength(10), UUIDs.randomBase64UUID(random()))), request -> - new SnapshotShardsService.UpdateIndexShardSnapshotStatusRequest(request.snapshot(), request.shardId(), request.status()), + new UpdateIndexShardSnapshotStatusRequest(request.snapshot(), request.shardId(), request.status()), request -> { final boolean mutateSnapshot = randomBoolean(); final boolean mutateShardId = randomBoolean(); final boolean mutateStatus = (mutateSnapshot || mutateShardId) == false || randomBoolean(); - return new SnapshotShardsService.UpdateIndexShardSnapshotStatusRequest( + return new UpdateIndexShardSnapshotStatusRequest( mutateSnapshot ? new Snapshot(randomAlphaOfLength(10), new SnapshotId(randomAlphaOfLength(10), UUIDs.randomBase64UUID(random()))) : request.snapshot(), mutateShardId ?