Skip to content

Commit

Permalink
Remove Snapshot INIT Step (elastic#55918)
Browse files Browse the repository at this point in the history
With elastic#55773 the snapshot INIT state step has become obsolete. We can set up the snapshot directly in one single step to simplify the state machine.

This is a big help for building concurrent snapshots because it allows us to establish a deterministic order of operations between snapshot create and delete operations since all of their entries now contain a repository generation. With this change simple queuing up of snapshot operations can and will be added in a follow-up.
  • Loading branch information
original-brownbear committed Jul 12, 2020
1 parent 4833861 commit 480199d
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.plugins.Plugin;
Expand All @@ -46,10 +45,6 @@

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -77,84 +72,6 @@ protected Settings nodeSettings(int nodeOrdinal) {
.build();
}

public void testDisruptionOnSnapshotInitialization() throws Exception {
final String idxName = "test";
final List<String> allMasterEligibleNodes = internalCluster().startMasterOnlyNodes(3);
final String dataNode = internalCluster().startDataOnlyNode();
ensureStableCluster(4);

createRandomIndex(idxName);

logger.info("--> creating repository");
assertAcked(client().admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(Settings.builder()
.put("location", randomRepoPath())
.put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));

// Writing incompatible snapshot can cause this test to fail due to a race condition in repo initialization
// by the current master and the former master. It is not causing any issues in real life scenario, but
// might make this test to fail. We are going to complete initialization of the snapshot to prevent this failures.
logger.info("--> initializing the repository");
assertEquals(SnapshotState.SUCCESS, client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1")
.setWaitForCompletion(true).setIncludeGlobalState(true).setIndices().get().getSnapshotInfo().state());

final String masterNode1 = internalCluster().getMasterName();
Set<String> otherNodes = new HashSet<>();
otherNodes.addAll(allMasterEligibleNodes);
otherNodes.remove(masterNode1);
otherNodes.add(dataNode);

NetworkDisruption networkDisruption =
new NetworkDisruption(new NetworkDisruption.TwoPartitions(Collections.singleton(masterNode1), otherNodes),
NetworkDisruption.UNRESPONSIVE);
internalCluster().setDisruptionScheme(networkDisruption);

ClusterService clusterService = internalCluster().clusterService(masterNode1);
CountDownLatch disruptionStarted = new CountDownLatch(1);
clusterService.addListener(new ClusterStateListener() {
@Override
public void clusterChanged(ClusterChangedEvent event) {
SnapshotsInProgress snapshots = event.state().custom(SnapshotsInProgress.TYPE);
if (snapshots != null && snapshots.entries().size() > 0) {
if (snapshots.entries().get(0).state() == SnapshotsInProgress.State.INIT) {
// The snapshot started, we can start disruption so the INIT state will arrive to another master node
logger.info("--> starting disruption");
networkDisruption.startDisrupting();
clusterService.removeListener(this);
disruptionStarted.countDown();
}
}
}
});

logger.info("--> starting snapshot");
ActionFuture<CreateSnapshotResponse> future = client(masterNode1).admin().cluster()
.prepareCreateSnapshot("test-repo", "test-snap-2").setWaitForCompletion(false).setIndices(idxName).execute();

logger.info("--> waiting for disruption to start");
assertTrue(disruptionStarted.await(1, TimeUnit.MINUTES));

awaitNoMoreRunningOperations(dataNode);

logger.info("--> verify that snapshot was successful or no longer exist");
assertBusy(() -> {
try {
assertSnapshotExists("test-repo", "test-snap-2");
} catch (SnapshotMissingException exception) {
logger.info("--> done verifying, snapshot doesn't exist");
}
}, 1, TimeUnit.MINUTES);

logger.info("--> stopping disrupting");
networkDisruption.stopDisrupting();
ensureStableCluster(4, masterNode1);
logger.info("--> done");

future.get();
awaitNoMoreRunningOperations(masterNode1);
}

public void testDisruptionAfterFinalization() throws Exception {
final String idxName = "test";
internalCluster().startMasterOnlyNodes(3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2246,8 +2246,8 @@ public void testDeleteDataStreamDuringSnapshot() throws Exception {
.put("block_on_data", true));


String dataStream = "test-ds";
DataStreamIT.putComposableIndexTemplate("dst", "@timestamp", org.elasticsearch.common.collect.List.of(dataStream));
String dataStream = "datastream";
DataStreamIT.putComposableIndexTemplate("dst", "@timestamp", Collections.singletonList(dataStream));

logger.info("--> indexing some data");
for (int i = 0; i < 100; i++) {
Expand All @@ -2272,7 +2272,7 @@ public void testDeleteDataStreamDuringSnapshot() throws Exception {
client.admin().indices().deleteDataStream(new DeleteDataStreamAction.Request(new String[]{dataStream})).actionGet();
fail("Expected deleting index to fail during snapshot");
} catch (SnapshotInProgressException e) {
assertThat(e.getMessage(), containsString("Cannot delete data streams that are being snapshotted: [test-ds"));
assertThat(e.getMessage(), containsString("Cannot delete data streams that are being snapshotted: ["+dataStream));
} finally {
logger.info("--> unblock all data nodes");
unblockAllDataNodes("test-repo");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,18 @@ protected ClusterBlockException checkBlock(CreateSnapshotRequest request, Cluste
@Override
protected void masterOperation(final CreateSnapshotRequest request, ClusterState state,
final ActionListener<CreateSnapshotResponse> listener) {
if (request.waitForCompletion()) {
snapshotsService.executeSnapshot(request, ActionListener.map(listener, CreateSnapshotResponse::new));
if (state.nodes().getMinNodeVersion().before(SnapshotsService.NO_REPO_INITIALIZE_VERSION)) {
if (request.waitForCompletion()) {
snapshotsService.executeSnapshotLegacy(request, ActionListener.map(listener, CreateSnapshotResponse::new));
} else {
snapshotsService.createSnapshotLegacy(request, ActionListener.map(listener, snapshot -> new CreateSnapshotResponse()));
}
} else {
snapshotsService.createSnapshot(request, ActionListener.map(listener, snapshot -> new CreateSnapshotResponse()));
if (request.waitForCompletion()) {
snapshotsService.executeSnapshot(request, ActionListener.map(listener, CreateSnapshotResponse::new));
} else {
snapshotsService.createSnapshot(request, ActionListener.map(listener, snapshot -> new CreateSnapshotResponse()));
}
}
}
}
143 changes: 141 additions & 2 deletions server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,13 @@ public SnapshotsService(Settings settings, ClusterService clusterService, IndexN
/**
* Same as {@link #createSnapshot(CreateSnapshotRequest, ActionListener)} but invokes its callback on completion of
* the snapshot.
* Note: This method is only used in clusters that contain a node older than {@link #NO_REPO_INITIALIZE_VERSION} to ensure a backwards
* compatible path for initializing the snapshot in the repository is executed.
*
* @param request snapshot request
* @param listener snapshot completion listener
*/
public void executeSnapshot(final CreateSnapshotRequest request, final ActionListener<SnapshotInfo> listener) {
public void executeSnapshotLegacy(final CreateSnapshotRequest request, final ActionListener<SnapshotInfo> listener) {
createSnapshot(request,
ActionListener.wrap(snapshot -> addListener(snapshot, ActionListener.map(listener, Tuple::v2)), listener::onFailure));
}
Expand All @@ -187,11 +189,13 @@ public void executeSnapshot(final CreateSnapshotRequest request, final ActionLis
* <p>
* This method is used by clients to start snapshot. It makes sure that there is no snapshots are currently running and
* creates a snapshot record in cluster state metadata.
* Note: This method is only used in clusters that contain a node older than {@link #NO_REPO_INITIALIZE_VERSION} to ensure a backwards
* compatible path for initializing the snapshot in the repository is executed.
*
* @param request snapshot request
* @param listener snapshot creation listener
*/
public void createSnapshot(final CreateSnapshotRequest request, final ActionListener<Snapshot> listener) {
public void createSnapshotLegacy(final CreateSnapshotRequest request, final ActionListener<Snapshot> listener) {
final String repositoryName = request.repository();
final String snapshotName = indexNameExpressionResolver.resolveDateMathExpression(request.snapshot());
validate(repositoryName, snapshotName);
Expand Down Expand Up @@ -287,6 +291,139 @@ public TimeValue timeout() {
});
}

/**
* Same as {@link #createSnapshot(CreateSnapshotRequest, ActionListener)} but invokes its callback on completion of
* the snapshot.
*
* @param request snapshot request
* @param listener snapshot completion listener
*/
public void executeSnapshot(final CreateSnapshotRequest request, final ActionListener<SnapshotInfo> listener) {
createSnapshot(request,
ActionListener.wrap(snapshot -> addListener(snapshot, ActionListener.map(listener, Tuple::v2)), listener::onFailure));
}

/**
* Initializes the snapshotting process.
* <p>
* This method is used by clients to start snapshot. It makes sure that there is no snapshots are currently running and
* creates a snapshot record in cluster state metadata.
*
* @param request snapshot request
* @param listener snapshot creation listener
*/
public void createSnapshot(final CreateSnapshotRequest request, final ActionListener<Snapshot> listener) {
final String repositoryName = request.repository();
final String snapshotName = indexNameExpressionResolver.resolveDateMathExpression(request.snapshot());
validate(repositoryName, snapshotName);
final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); // new UUID for the snapshot
Repository repository = repositoriesService.repository(request.repository());
if (repository.isReadOnly()) {
listener.onFailure(
new RepositoryException(repository.getMetadata().name(), "cannot create snapshot in a readonly repository"));
return;
}
final Snapshot snapshot = new Snapshot(repositoryName, snapshotId);
final Map<String, Object> userMeta = repository.adaptUserMetadata(request.userMetadata());
repository.executeConsistentStateUpdate(repositoryData -> new ClusterStateUpdateTask() {

private SnapshotsInProgress.Entry newEntry;

@Override
public ClusterState execute(ClusterState currentState) {
// check if the snapshot name already exists in the repository
if (repositoryData.getSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) {
throw new InvalidSnapshotNameException(
repository.getMetadata().name(), snapshotName, "snapshot with the same name already exists");
}
validate(repositoryName, snapshotName, currentState);
SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName,
"cannot snapshot while a snapshot deletion is in-progress in [" + deletionsInProgress + "]");
}
final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE);
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) {
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName,
"cannot snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]");
}
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
// Fail if there are any concurrently running snapshots. The only exception to this being a snapshot in INIT state from a
// previous master that we can simply ignore and remove from the cluster state because we would clean it up from the
// cluster state anyway in #applyClusterState.
if (snapshots != null && snapshots.entries().stream().anyMatch(entry -> entry.state() != State.INIT)) {
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, " a snapshot is already running");
}
// Store newSnapshot here to be processed in clusterStateProcessed
List<String> indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState, request));
logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices);

final List<IndexId> indexIds = repositoryData.resolveNewIndices(indices);
final List<String> dataStreams =
indexNameExpressionResolver.dataStreamNames(currentState, request.indicesOptions(), request.indices());
final Version version = minCompatibleVersion(
clusterService.state().nodes().getMinNodeVersion(), repositoryName, repositoryData, null);
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards =
shards(currentState, indexIds, useShardGenerations(version), repositoryData);
if (request.partial() == false) {
Tuple<Set<String>, Set<String>> indicesWithMissingShards = indicesWithMissingShards(shards,
currentState.metadata());
Set<String> missing = indicesWithMissingShards.v1();
Set<String> closed = indicesWithMissingShards.v2();
if (missing.isEmpty() == false || closed.isEmpty() == false) {
final StringBuilder failureMessage = new StringBuilder();
if (missing.isEmpty() == false) {
failureMessage.append("Indices don't have primary shards ");
failureMessage.append(missing);
}
if (closed.isEmpty() == false) {
if (failureMessage.length() > 0) {
failureMessage.append("; ");
}
failureMessage.append("Indices are closed ");
}
// TODO: We should just throw here instead of creating a FAILED and hence useless snapshot in the repository
newEntry = new SnapshotsInProgress.Entry(
new Snapshot(repositoryName, snapshotId), request.includeGlobalState(), false,
State.FAILED, indexIds, dataStreams, threadPool.absoluteTimeInMillis(), repositoryData.getGenId(), shards,
failureMessage.toString(), userMeta, version);
}
}
if (newEntry == null) {
newEntry = new SnapshotsInProgress.Entry(
new Snapshot(repositoryName, snapshotId), request.includeGlobalState(), request.partial(),
State.STARTED, indexIds, dataStreams, threadPool.absoluteTimeInMillis(), repositoryData.getGenId(), shards,
null, userMeta, version);
}
return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE,
SnapshotsInProgress.of(Collections.singletonList(newEntry))).build();
}

@Override
public void onFailure(String source, Exception e) {
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to create snapshot", repositoryName, snapshotName), e);
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, final ClusterState newState) {
try {
logger.info("snapshot [{}] started", snapshot);
listener.onResponse(snapshot);
} finally {
if (newEntry.state().completed() || newEntry.shards().isEmpty()) {
endSnapshot(newEntry, newState.metadata());
}
}
}

@Override
public TimeValue timeout() {
return request.masterNodeTimeout();
}
}, "create_snapshot [" + snapshotName + ']', listener::onFailure);
}

/**
* Validates snapshot request
*
Expand Down Expand Up @@ -332,6 +469,8 @@ private static void validate(final String repositoryName, final String snapshotN
* Starts snapshot.
* <p>
* Creates snapshot in repository and updates snapshot metadata record with list of shards that needs to be processed.
* Note: This method is only used in clusters that contain a node older than {@link #NO_REPO_INITIALIZE_VERSION} to ensure a backwards
* compatible path for initializing the snapshot in the repository is executed.
*
* @param clusterState cluster state
* @param snapshot snapshot meta data
Expand Down
Loading

0 comments on commit 480199d

Please sign in to comment.