Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Functionality to Consistently Read RepositoryData For CS Updates (#55773) #56091

Merged
merged 1 commit into from
May 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Arrays;

/**
* Transport action for delete snapshot operation
Expand Down Expand Up @@ -71,7 +70,6 @@ protected ClusterBlockException checkBlock(DeleteSnapshotRequest request, Cluste
@Override
protected void masterOperation(final DeleteSnapshotRequest request, ClusterState state,
final ActionListener<AcknowledgedResponse> listener) {
snapshotsService.deleteSnapshots(request.repository(), Arrays.asList(request.snapshots()),
ActionListener.map(listener, v -> new AcknowledgedResponse(true)));
snapshotsService.deleteSnapshots(request, ActionListener.map(listener, v -> new AcknowledgedResponse(true)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
Expand All @@ -42,6 +43,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;

public class FilterRepository implements Repository {
Expand Down Expand Up @@ -151,6 +153,12 @@ public void updateState(ClusterState state) {
in.updateState(state);
}

@Override
public void executeConsistentStateUpdate(Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask, String source,
Consumer<Exception> onFailure) {
in.executeConsistentStateUpdate(createUpdateTask, source, onFailure);
}

@Override
public Lifecycle.State lifecycleState() {
return in.lifecycleState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
Expand All @@ -43,6 +44,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;

/**
Expand Down Expand Up @@ -269,6 +271,21 @@ void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId s
*/
void updateState(ClusterState state);

/**
* Execute a cluster state update with a consistent view of the current {@link RepositoryData}. The {@link ClusterState} passed to the
* task generated through {@code createUpdateTask} is guaranteed to point at the same state for this repository as the did the state
* at the time the {@code RepositoryData} was loaded.
* This allows for operations on the repository that need a consistent view of both the cluster state and the repository contents at
* one point in time like for example, checking if a snapshot is in the repository before adding the delete operation for it to the
* cluster state.
*
* @param createUpdateTask function to supply cluster state update task
* @param source the source of the cluster state update task
* @param onFailure error handler invoked on failure to get a consistent view of the current {@link RepositoryData}
*/
void executeConsistentStateUpdate(Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask, String source,
Consumer<Exception> onFailure);

/**
* Hook that allows a repository to filter the user supplied snapshot metadata in {@link SnapshotsInProgress.Entry#userMetadata()}
* during snapshot initialization.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
Expand Down Expand Up @@ -337,6 +338,67 @@ protected void doClose() {
}
}

@Override
public void executeConsistentStateUpdate(Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask, String source,
Consumer<Exception> onFailure) {
threadPool.generic().execute(new AbstractRunnable() {
@Override
protected void doRun() {
final RepositoryMetadata repositoryMetadataStart = metadata;
getRepositoryData(ActionListener.wrap(repositoryData -> {
final ClusterStateUpdateTask updateTask = createUpdateTask.apply(repositoryData);
clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask(updateTask.priority()) {

private boolean executedTask = false;

@Override
public ClusterState execute(ClusterState currentState) throws Exception {
// Comparing the full metadata here on purpose instead of simply comparing the safe generation.
// If the safe generation has changed, then we have to reload repository data and start over.
// If the pending generation has changed we are in the midst of a write operation and might pick up the
// updated repository data and state on the retry. We don't want to wait for the write to finish though
// because it could fail for any number of reasons so we just retry instead of waiting on the cluster state
// to change in any form.
if (repositoryMetadataStart.equals(getRepoMetadata(currentState))) {
executedTask = true;
return updateTask.execute(currentState);
}
return currentState;
}

@Override
public void onFailure(String source, Exception e) {
if (executedTask) {
updateTask.onFailure(source, e);
} else {
onFailure.accept(e);
}
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (executedTask) {
updateTask.clusterStateProcessed(source, oldState, newState);
} else {
executeConsistentStateUpdate(createUpdateTask, source, onFailure);
}
}

@Override
public TimeValue timeout() {
return updateTask.timeout();
}
});
}, onFailure));
}

@Override
public void onFailure(Exception e) {
onFailure.accept(e);
}
});
}

// Inspects all cluster state elements that contain a hint about what the current repository generation is and updates
// #latestKnownRepoGen if a newer than currently known generation is found
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
Expand Down Expand Up @@ -1000,14 +1001,17 @@ private void failSnapshotCompletionListeners(Snapshot snapshot, Exception e) {
* If deleting a single snapshot, first checks if a snapshot is still running and if so cancels the snapshot and then deletes it from
* the repository.
* If the snapshot is not running or multiple snapshot names are given, moves to trying to find a matching {@link Snapshot}s for the
* given names in the repository and deletes them by invoking {@link #deleteCompletedSnapshots}.
* given names in the repository and deletes them.
*
* @param repositoryName repositoryName
* @param snapshotNames snapshotNames
* @param request delete snapshot request
* @param listener listener
*/
public void deleteSnapshots(final String repositoryName, final Collection<String> snapshotNames, final ActionListener<Void> listener) {
logger.info("deleting snapshots {} from repository [{}]", snapshotNames, repositoryName);
public void deleteSnapshots(final DeleteSnapshotRequest request, final ActionListener<Void> listener) {

final String[] snapshotNames = request.snapshots();
final String repositoryName = request.repository();
logger.info(() -> new ParameterizedMessage("deleting snapshots [{}] from repository [{}]",
Strings.arrayToCommaDelimitedString(snapshotNames), repositoryName));

clusterService.submitStateUpdateTask("delete snapshot", new ClusterStateUpdateTask(Priority.NORMAL) {

Expand All @@ -1017,15 +1021,15 @@ public void deleteSnapshots(final String repositoryName, final Collection<String

@Override
public ClusterState execute(ClusterState currentState) {
if (snapshotNames.size() > 1 && currentState.nodes().getMinNodeVersion().before(MULTI_DELETE_VERSION)) {
if (snapshotNames.length > 1 && currentState.nodes().getMinNodeVersion().before(MULTI_DELETE_VERSION)) {
throw new IllegalArgumentException("Deleting multiple snapshots in a single request is only supported in version [ "
+ MULTI_DELETE_VERSION + "] but cluster contained node of version [" + currentState.nodes().getMinNodeVersion()
+ "]");
}
final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
final SnapshotsInProgress.Entry snapshotEntry;
if (snapshotNames.size() == 1) {
final String snapshotName = snapshotNames.iterator().next();
if (snapshotNames.length == 1) {
final String snapshotName = snapshotNames[0];
if (Regex.isSimpleMatchPattern(snapshotName)) {
snapshotEntry = null;
} else {
Expand Down Expand Up @@ -1101,18 +1105,23 @@ public void onFailure(String source, Exception e) {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (runningSnapshot == null) {
threadPool.generic().execute(ActionRunnable.wrap(listener, l ->
repositoriesService.repository(repositoryName).getRepositoryData(ActionListener.wrap(repositoryData ->
deleteCompletedSnapshots(matchingSnapshotIds(repositoryData, snapshotNames, repositoryName),
repositoryName, repositoryData.getGenId(), Priority.NORMAL, l), l::onFailure))));
try {
repositoriesService.repository(repositoryName).executeConsistentStateUpdate(repositoryData ->
createDeleteStateUpdate(matchingSnapshotIds(repositoryData, snapshotNames, repositoryName), repositoryName,
repositoryData.getGenId(), request.masterNodeTimeout(), Priority.NORMAL, listener),
"delete completed snapshots", listener::onFailure);
} catch (RepositoryMissingException e) {
listener.onFailure(e);
}
return;
}
logger.trace("adding snapshot completion listener to wait for deleted snapshot to finish");
addListener(runningSnapshot, ActionListener.wrap(
result -> {
logger.debug("deleted snapshot completed - deleting files");
deleteCompletedSnapshots(Collections.singletonList(result.v2().snapshotId()), repositoryName,
result.v1().getGenId(), Priority.IMMEDIATE, listener);
clusterService.submitStateUpdateTask("delete snapshot",
createDeleteStateUpdate(Collections.singletonList(result.v2().snapshotId()), repositoryName,
result.v1().getGenId(), null, Priority.IMMEDIATE, listener));
},
e -> {
if (abortedDuringInit) {
Expand All @@ -1133,10 +1142,15 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
}
));
}

@Override
public TimeValue timeout() {
return request.masterNodeTimeout();
}
});
}

private static List<SnapshotId> matchingSnapshotIds(RepositoryData repositoryData, Collection<String> snapshotsOrPatterns,
private static List<SnapshotId> matchingSnapshotIds(RepositoryData repositoryData, String[] snapshotsOrPatterns,
String repositoryName) {
final Map<String, SnapshotId> allSnapshotIds = repositoryData.getSnapshotIds().stream().collect(
Collectors.toMap(SnapshotId::getName, Function.identity()));
Expand Down Expand Up @@ -1178,23 +1192,33 @@ private static SnapshotsInProgress.Entry findInProgressSnapshot(@Nullable Snapsh
return snapshotEntry;
}

/**
* Deletes snapshots that are assumed to be in the repository and not tracked as in-progress in the cluster state.
*
* @param snapshotIds Snapshots to delete
* @param repoName Repository name
* @param repositoryStateId Repository generation to base the delete on
* @param listener Listener to complete when done
*/
private void deleteCompletedSnapshots(List<SnapshotId> snapshotIds, String repoName, long repositoryStateId, Priority priority,
ActionListener<Void> listener) {
private ClusterStateUpdateTask createDeleteStateUpdate(List<SnapshotId> snapshotIds, String repoName, long repositoryStateId,
@Nullable TimeValue timeout, Priority priority, ActionListener<Void> listener) {
// Short circuit to noop state update if there isn't anything to delete
if (snapshotIds.isEmpty()) {
listener.onResponse(null);
return;
return new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return currentState;
}

@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(null);
}

@Override
public TimeValue timeout() {
return timeout;
}
};
}
logger.debug("deleting snapshots {} assuming repository generation [{}] and with priority [{}]", snapshotIds, repositoryStateId,
priority);
clusterService.submitStateUpdateTask("delete snapshot", new ClusterStateUpdateTask(priority) {
return new ClusterStateUpdateTask(priority) {
@Override
public ClusterState execute(ClusterState currentState) {
SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
Expand Down Expand Up @@ -1250,7 +1274,7 @@ public void onFailure(String source, Exception e) {
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
deleteSnapshotsFromRepository(repoName, snapshotIds, listener, repositoryStateId, newState.nodes().getMinNodeVersion());
}
});
};
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
Expand Down Expand Up @@ -53,6 +54,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;

import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -230,6 +232,11 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, In
public void updateState(final ClusterState state) {
}

@Override
public void executeConsistentStateUpdate(Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask, String source,
Consumer<Exception> onFailure) {
}

@Override
public Lifecycle.State lifecycleState() {
return null;
Expand Down
Loading