Skip to content

Commit

Permalink
Move all Snapshot Master Node Steps to SnapshotsService (elastic#56365)…
Browse files Browse the repository at this point in the history
… (elastic#59373)

This refactoring has three motivations:

1. Separate all master node steps during snapshot operations from all data node steps in code.
2. Set up next steps in concurrent repository operations and general improvements by centralizing tracking of each shard's state in the repository in `SnapshotsService` so that operations for each shard can be linearized efficiently (i.e. without having to inspect the full snapshot state for all shards on every cluster state update, allowing us to track more in memory and only fall back to inspecting the full CS on master failover like we do in the snapshot shards service).
    * This PR already contains some best effort examples of this, but obviously this could be way improved upon still (just did not want to do it in this PR for complexity reasons)
3. Make the `SnapshotsService` less expensive on the CS thread for large snapshots
  • Loading branch information
original-brownbear authored Jul 12, 2020
1 parent a6a27b7 commit 4833861
Show file tree
Hide file tree
Showing 8 changed files with 342 additions and 266 deletions.
5 changes: 2 additions & 3 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -555,10 +555,9 @@ protected Node(final Environment initialEnvironment,
RepositoriesService repositoryService = repositoriesModule.getRepositoryService();
repositoriesServiceReference.set(repositoryService);
SnapshotsService snapshotsService = new SnapshotsService(settings, clusterService,
clusterModule.getIndexNameExpressionResolver(), repositoryService, threadPool);
clusterModule.getIndexNameExpressionResolver(), repositoryService, transportService, actionModule.getActionFilters());
SnapshotShardsService snapshotShardsService = new SnapshotShardsService(settings, clusterService, repositoryService,
threadPool, transportService, indicesService, actionModule.getActionFilters(),
clusterModule.getIndexNameExpressionResolver());
transportService, indicesService);
TransportNodesSnapshotsStatus nodesSnapshotsStatus = new TransportNodesSnapshotsStatus(threadPool, clusterService,
transportService, snapshotShardsService, actionModule.getActionFilters());
RestoreService restoreService = new RestoreService(clusterService, repositoryService, clusterModule.getAllocationService(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,33 +26,19 @@
import org.apache.lucene.index.IndexCommit;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus;
import org.elasticsearch.cluster.SnapshotsInProgress.ShardState;
import org.elasticsearch.cluster.SnapshotsInProgress.State;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.seqno.SequenceNumbers;
Expand All @@ -74,22 +60,18 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;

import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableList;
import static org.elasticsearch.cluster.SnapshotsInProgress.completed;

/**
* This service runs on data and master nodes and controls currently snapshotted shards on these nodes. It is responsible for
* starting and stopping shard level snapshots
* This service runs on data nodes and controls currently running shard snapshots on these nodes. It is responsible for
* starting and stopping shard level snapshots.
* See package level documentation of {@link org.elasticsearch.snapshots} for details.
*/
public class SnapshotShardsService extends AbstractLifecycleComponent implements ClusterStateListener, IndexEventListener {
private static final Logger logger = LogManager.getLogger(SnapshotShardsService.class);
Expand All @@ -110,31 +92,21 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
private final TransportRequestDeduplicator<UpdateIndexShardSnapshotStatusRequest> remoteFailedRequestDeduplicator =
new TransportRequestDeduplicator<>();

private final SnapshotStateExecutor snapshotStateExecutor = new SnapshotStateExecutor();
private final UpdateSnapshotStatusAction updateSnapshotStatusHandler;

public SnapshotShardsService(Settings settings, ClusterService clusterService, RepositoriesService repositoriesService,
ThreadPool threadPool, TransportService transportService, IndicesService indicesService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
TransportService transportService, IndicesService indicesService) {
this.indicesService = indicesService;
this.repositoriesService = repositoriesService;
this.transportService = transportService;
this.clusterService = clusterService;
this.threadPool = threadPool;
this.threadPool = transportService.getThreadPool();
if (DiscoveryNode.isDataNode(settings)) {
// this is only useful on the nodes that can hold data
clusterService.addListener(this);
}

// The constructor of UpdateSnapshotStatusAction will register itself to the TransportService.
this.updateSnapshotStatusHandler =
new UpdateSnapshotStatusAction(transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver);
}

@Override
protected void doStart() {
assert this.updateSnapshotStatusHandler != null;
assert transportService.getRequestHandler(SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME) != null;
}

@Override
Expand Down Expand Up @@ -444,77 +416,6 @@ private void syncShardStatsOnNewMaster(ClusterChangedEvent event) {
}
}

/**
* Internal request that is used to send changes in snapshot status to master
*/
public static class UpdateIndexShardSnapshotStatusRequest extends MasterNodeRequest<UpdateIndexShardSnapshotStatusRequest> {
private final Snapshot snapshot;
private final ShardId shardId;
private final ShardSnapshotStatus status;

public UpdateIndexShardSnapshotStatusRequest(StreamInput in) throws IOException {
super(in);
snapshot = new Snapshot(in);
shardId = new ShardId(in);
status = new ShardSnapshotStatus(in);
}

public UpdateIndexShardSnapshotStatusRequest(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus status) {
this.snapshot = snapshot;
this.shardId = shardId;
this.status = status;
// By default, we keep trying to post snapshot status messages to avoid snapshot processes getting stuck.
this.masterNodeTimeout = TimeValue.timeValueNanos(Long.MAX_VALUE);
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
snapshot.writeTo(out);
shardId.writeTo(out);
status.writeTo(out);
}

public Snapshot snapshot() {
return snapshot;
}

public ShardId shardId() {
return shardId;
}

public ShardSnapshotStatus status() {
return status;
}

@Override
public String toString() {
return snapshot + ", shardId [" + shardId + "], status [" + status.state() + "]";
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final UpdateIndexShardSnapshotStatusRequest that = (UpdateIndexShardSnapshotStatusRequest) o;
return snapshot.equals(that.snapshot) && shardId.equals(that.shardId) && status.equals(that.status);
}

@Override
public int hashCode() {
return Objects.hash(snapshot, shardId, status);
}
}

/** Notify the master node that the given shard has been successfully snapshotted **/
private void notifySuccessfulSnapshotShard(final Snapshot snapshot, final ShardId shardId, String generation) {
assert generation != null;
Expand Down Expand Up @@ -569,125 +470,4 @@ public String executor() {
})
);
}

/**
* Updates the shard status on master node
*
* @param request update shard status request
*/
private void innerUpdateSnapshotState(final UpdateIndexShardSnapshotStatusRequest request,
ActionListener<UpdateIndexShardSnapshotStatusResponse> listener) {
logger.trace("received updated snapshot restore state [{}]", request);
clusterService.submitStateUpdateTask(
"update snapshot state",
request,
ClusterStateTaskConfig.build(Priority.NORMAL),
snapshotStateExecutor,
new ClusterStateTaskListener() {
@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(new UpdateIndexShardSnapshotStatusResponse());
}
});
}

private static class SnapshotStateExecutor implements ClusterStateTaskExecutor<UpdateIndexShardSnapshotStatusRequest> {

@Override
public ClusterTasksResult<UpdateIndexShardSnapshotStatusRequest>
execute(ClusterState currentState, List<UpdateIndexShardSnapshotStatusRequest> tasks) {
final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
if (snapshots != null) {
int changedCount = 0;
final List<SnapshotsInProgress.Entry> entries = new ArrayList<>();
for (SnapshotsInProgress.Entry entry : snapshots.entries()) {
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shards = ImmutableOpenMap.builder();
boolean updated = false;

for (UpdateIndexShardSnapshotStatusRequest updateSnapshotState : tasks) {
if (entry.snapshot().equals(updateSnapshotState.snapshot())) {
logger.trace("[{}] Updating shard [{}] with status [{}]", updateSnapshotState.snapshot(),
updateSnapshotState.shardId(), updateSnapshotState.status().state());
if (updated == false) {
shards.putAll(entry.shards());
updated = true;
}
shards.put(updateSnapshotState.shardId(), updateSnapshotState.status());
changedCount++;
}
}

if (updated) {
if (completed(shards.values()) == false) {
entries.add(new SnapshotsInProgress.Entry(entry, shards.build()));
} else {
// Snapshot is finished - mark it as done
// TODO: Add PARTIAL_SUCCESS status?
SnapshotsInProgress.Entry updatedEntry = new SnapshotsInProgress.Entry(entry, State.SUCCESS, shards.build());
entries.add(updatedEntry);
}
} else {
entries.add(entry);
}
}
if (changedCount > 0) {
logger.trace("changed cluster state triggered by {} snapshot state updates", changedCount);
return ClusterTasksResult.<UpdateIndexShardSnapshotStatusRequest>builder().successes(tasks)
.build(ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE,
SnapshotsInProgress.of(unmodifiableList(entries))).build());
}
}
return ClusterTasksResult.<UpdateIndexShardSnapshotStatusRequest>builder().successes(tasks).build(currentState);
}
}

static class UpdateIndexShardSnapshotStatusResponse extends ActionResponse {

UpdateIndexShardSnapshotStatusResponse() {}

UpdateIndexShardSnapshotStatusResponse(StreamInput in) throws IOException {
super(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {}
}

private class UpdateSnapshotStatusAction
extends TransportMasterNodeAction<UpdateIndexShardSnapshotStatusRequest, UpdateIndexShardSnapshotStatusResponse> {
UpdateSnapshotStatusAction(TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(
SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME, false, transportService, clusterService, threadPool,
actionFilters, UpdateIndexShardSnapshotStatusRequest::new, indexNameExpressionResolver
);
}

@Override
protected String executor() {
return ThreadPool.Names.SAME;
}

@Override
protected UpdateIndexShardSnapshotStatusResponse read(StreamInput in) throws IOException {
return new UpdateIndexShardSnapshotStatusResponse(in);
}

@Override
protected void masterOperation(UpdateIndexShardSnapshotStatusRequest request, ClusterState state,
ActionListener<UpdateIndexShardSnapshotStatusResponse> listener) {
innerUpdateSnapshotState(request, listener);
}

@Override
protected ClusterBlockException checkBlock(UpdateIndexShardSnapshotStatusRequest request, ClusterState state) {
return null;
}
}

}
Loading

0 comments on commit 4833861

Please sign in to comment.