Skip to content

Commit

Permalink
Add Functionality to Consistently Read RepositoryData For CS Updates (#…
Browse files Browse the repository at this point in the history
…55773)

Using optimistic locking, add the ability to run a repository state
update task with a consistent view of the current repository data.
Allows for a follow-up to remove the snapshot INIT state.
  • Loading branch information
original-brownbear authored Apr 28, 2020
1 parent b614412 commit c1fca12
Show file tree
Hide file tree
Showing 8 changed files with 207 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
Expand All @@ -42,6 +43,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;

public class FilterRepository implements Repository {
Expand Down Expand Up @@ -146,6 +148,12 @@ public void updateState(ClusterState state) {
in.updateState(state);
}

@Override
public void executeConsistentStateUpdate(Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask, String source,
Consumer<Exception> onFailure) {
in.executeConsistentStateUpdate(createUpdateTask, source, onFailure);
}

@Override
public Lifecycle.State lifecycleState() {
return in.lifecycleState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
Expand All @@ -43,6 +44,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;

/**
Expand Down Expand Up @@ -256,6 +258,21 @@ void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId s
*/
void updateState(ClusterState state);

/**
* Execute a cluster state update with a consistent view of the current {@link RepositoryData}. The {@link ClusterState} passed to the
* task generated through {@code createUpdateTask} is guaranteed to point at the same state for this repository as the did the state
* at the time the {@code RepositoryData} was loaded.
* This allows for operations on the repository that need a consistent view of both the cluster state and the repository contents at
* one point in time like for example, checking if a snapshot is in the repository before adding the delete operation for it to the
* cluster state.
*
* @param createUpdateTask function to supply cluster state update task
* @param source the source of the cluster state update task
* @param onFailure error handler invoked on failure to get a consistent view of the current {@link RepositoryData}
*/
void executeConsistentStateUpdate(Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask, String source,
Consumer<Exception> onFailure);

/**
* Hook that allows a repository to filter the user supplied snapshot metadata in {@link SnapshotsInProgress.Entry#userMetadata()}
* during snapshot initialization.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
Expand Down Expand Up @@ -327,6 +328,67 @@ protected void doClose() {
}
}

@Override
public void executeConsistentStateUpdate(Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask, String source,
Consumer<Exception> onFailure) {
threadPool.generic().execute(new AbstractRunnable() {
@Override
protected void doRun() {
final RepositoryMetadata repositoryMetadataStart = metadata;
getRepositoryData(ActionListener.wrap(repositoryData -> {
final ClusterStateUpdateTask updateTask = createUpdateTask.apply(repositoryData);
clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask(updateTask.priority()) {

private boolean executedTask = false;

@Override
public ClusterState execute(ClusterState currentState) throws Exception {
// Comparing the full metadata here on purpose instead of simply comparing the safe generation.
// If the safe generation has changed, then we have to reload repository data and start over.
// If the pending generation has changed we are in the midst of a write operation and might pick up the
// updated repository data and state on the retry. We don't want to wait for the write to finish though
// because it could fail for any number of reasons so we just retry instead of waiting on the cluster state
// to change in any form.
if (repositoryMetadataStart.equals(getRepoMetadata(currentState))) {
executedTask = true;
return updateTask.execute(currentState);
}
return currentState;
}

@Override
public void onFailure(String source, Exception e) {
if (executedTask) {
updateTask.onFailure(source, e);
} else {
onFailure.accept(e);
}
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (executedTask) {
updateTask.clusterStateProcessed(source, oldState, newState);
} else {
executeConsistentStateUpdate(createUpdateTask, source, onFailure);
}
}

@Override
public TimeValue timeout() {
return updateTask.timeout();
}
});
}, onFailure));
}

@Override
public void onFailure(Exception e) {
onFailure.accept(e);
}
});
}

// Inspects all cluster state elements that contain a hint about what the current repository generation is and updates
// #latestKnownRepoGen if a newer than currently known generation is found
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -988,7 +988,7 @@ private void failSnapshotCompletionListeners(Snapshot snapshot, Exception e) {
* If deleting a single snapshot, first checks if a snapshot is still running and if so cancels the snapshot and then deletes it from
* the repository.
* If the snapshot is not running or multiple snapshot names are given, moves to trying to find a matching {@link Snapshot}s for the
* given names in the repository and deletes them by invoking {@link #deleteCompletedSnapshots}.
* given names in the repository and deletes them.
*
* @param request delete snapshot request
* @param listener listener
Expand Down Expand Up @@ -1092,18 +1092,23 @@ public void onFailure(String source, Exception e) {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (runningSnapshot == null) {
threadPool.generic().execute(ActionRunnable.wrap(listener, l ->
repositoriesService.repository(repositoryName).getRepositoryData(ActionListener.wrap(repositoryData ->
deleteCompletedSnapshots(matchingSnapshotIds(repositoryData, snapshotNames, repositoryName),
repositoryName, repositoryData.getGenId(), Priority.NORMAL, l), l::onFailure))));
try {
repositoriesService.repository(repositoryName).executeConsistentStateUpdate(repositoryData ->
createDeleteStateUpdate(matchingSnapshotIds(repositoryData, snapshotNames, repositoryName), repositoryName,
repositoryData.getGenId(), request.masterNodeTimeout(), Priority.NORMAL, listener),
"delete completed snapshots", listener::onFailure);
} catch (RepositoryMissingException e) {
listener.onFailure(e);
}
return;
}
logger.trace("adding snapshot completion listener to wait for deleted snapshot to finish");
addListener(runningSnapshot, ActionListener.wrap(
result -> {
logger.debug("deleted snapshot completed - deleting files");
deleteCompletedSnapshots(Collections.singletonList(result.v2().snapshotId()), repositoryName,
result.v1().getGenId(), Priority.IMMEDIATE, listener);
clusterService.submitStateUpdateTask("delete snapshot",
createDeleteStateUpdate(Collections.singletonList(result.v2().snapshotId()), repositoryName,
result.v1().getGenId(), null, Priority.IMMEDIATE, listener));
},
e -> {
if (abortedDuringInit) {
Expand Down Expand Up @@ -1174,23 +1179,33 @@ private static SnapshotsInProgress.Entry findInProgressSnapshot(@Nullable Snapsh
return snapshotEntry;
}

/**
* Deletes snapshots that are assumed to be in the repository and not tracked as in-progress in the cluster state.
*
* @param snapshotIds Snapshots to delete
* @param repoName Repository name
* @param repositoryStateId Repository generation to base the delete on
* @param listener Listener to complete when done
*/
private void deleteCompletedSnapshots(List<SnapshotId> snapshotIds, String repoName, long repositoryStateId, Priority priority,
ActionListener<Void> listener) {
private ClusterStateUpdateTask createDeleteStateUpdate(List<SnapshotId> snapshotIds, String repoName, long repositoryStateId,
@Nullable TimeValue timeout, Priority priority, ActionListener<Void> listener) {
// Short circuit to noop state update if there isn't anything to delete
if (snapshotIds.isEmpty()) {
listener.onResponse(null);
return;
return new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return currentState;
}

@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(null);
}

@Override
public TimeValue timeout() {
return timeout;
}
};
}
logger.debug("deleting snapshots {} assuming repository generation [{}] and with priority [{}]", snapshotIds, repositoryStateId,
priority);
clusterService.submitStateUpdateTask("delete snapshot", new ClusterStateUpdateTask(priority) {
return new ClusterStateUpdateTask(priority) {
@Override
public ClusterState execute(ClusterState currentState) {
SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
Expand Down Expand Up @@ -1246,7 +1261,7 @@ public void onFailure(String source, Exception e) {
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
deleteSnapshotsFromRepository(repoName, snapshotIds, listener, repositoryStateId, newState.nodes().getMinNodeVersion());
}
});
};
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
Expand Down Expand Up @@ -53,6 +54,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;

import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -225,6 +227,11 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, In
public void updateState(final ClusterState state) {
}

@Override
public void executeConsistentStateUpdate(Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask, String source,
Consumer<Exception> onFailure) {
}

@Override
public Lifecycle.State lifecycleState() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -220,6 +221,7 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.iterableWithSize;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.mockito.Mockito.mock;

public class SnapshotResiliencyTests extends ESTestCase {
Expand Down Expand Up @@ -506,7 +508,6 @@ public void clusterChanged(ClusterChangedEvent event) {
assertEquals(0, snapshotInfo.failedShards());
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/55702")
public void testConcurrentSnapshotCreateAndDeleteOther() {
setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10));

Expand Down Expand Up @@ -738,6 +739,63 @@ public void onFailure(Exception e) {
assertEquals(0, snapshotInfo.failedShards());
}

public void testConcurrentDeletes() {
setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10));

String repoName = "repo";
String snapshotName = "snapshot";
final String index = "test";
final int shards = randomIntBetween(1, 10);

TestClusterNodes.TestClusterNode masterNode =
testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state());

final StepListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new StepListener<>();

continueOrDie(createRepoAndIndex(repoName, index, shards),
createIndexResponse -> client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
.setWaitForCompletion(true).execute(createSnapshotResponseStepListener));

final Collection<StepListener<Boolean>> deleteSnapshotStepListeners = List.of(new StepListener<>(), new StepListener<>());

final AtomicInteger successfulDeletes = new AtomicInteger(0);

continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> {
for (StepListener<Boolean> deleteListener : deleteSnapshotStepListeners) {
client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotName).execute(
ActionListener.wrap(
resp -> deleteListener.onResponse(true),
e -> {
final Throwable unwrapped = ExceptionsHelper.unwrap(
e, ConcurrentSnapshotExecutionException.class, SnapshotMissingException.class);
assertThat(unwrapped, notNullValue());
deleteListener.onResponse(false);
}));
}
});

for (StepListener<Boolean> deleteListener : deleteSnapshotStepListeners) {
continueOrDie(deleteListener, deleted -> {
if (deleted) {
successfulDeletes.incrementAndGet();
}
});
}

deterministicTaskQueue.runAllRunnableTasks();

SnapshotDeletionsInProgress deletionsInProgress = masterNode.clusterService.state().custom(SnapshotDeletionsInProgress.TYPE);
assertFalse(deletionsInProgress.hasDeletionsInProgress());
final Repository repository = masterNode.repositoriesService.repository(repoName);
final RepositoryData repositoryData = getRepositoryData(repository);
Collection<SnapshotId> snapshotIds = repositoryData.getSnapshotIds();
// We end up with no snapshots since at least one of the deletes worked out
assertThat(snapshotIds, empty());
assertThat(successfulDeletes.get(), either(is(1)).or(is(2)));
// We did one snapshot and one delete so we went two steps from the empty generation (-1) to 1
assertThat(repositoryData.getGenId(), is(1L));
}

/**
* Simulates concurrent restarts of data and master nodes as well as relocating a primary shard, while starting and subsequently
* deleting a snapshot.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
Expand All @@ -44,6 +45,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;

import static java.util.Collections.emptyList;
Expand Down Expand Up @@ -153,4 +155,10 @@ public void verify(String verificationToken, DiscoveryNode localNode) {
@Override
public void updateState(final ClusterState state) {
}

@Override
public void executeConsistentStateUpdate(Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask, String source,
Consumer<Exception> onFailure) {
throw new UnsupportedOperationException("Unsupported for restore-only repository");
}
}
Loading

0 comments on commit c1fca12

Please sign in to comment.