Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Consistent ClusterState throughout Snapshot API Calls #51464

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
Expand Down Expand Up @@ -96,21 +97,23 @@ protected ClusterBlockException checkBlock(GetSnapshotsRequest request, ClusterS
protected void masterOperation(Task task, final GetSnapshotsRequest request, final ClusterState state,
final ActionListener<GetSnapshotsResponse> listener) {
final String[] repositories = request.repositories();
final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE);
transportService.sendChildRequest(transportService.getLocalNode(), GetRepositoriesAction.NAME,
new GetRepositoriesRequest(repositories), task, TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(
ActionListener.wrap(
response ->
// switch to GENERIC thread pool because it might be long running operation
threadPool.executor(ThreadPool.Names.GENERIC).execute(
() -> getMultipleReposSnapshotInfo(response.repositories(), request.snapshots(),
request.ignoreUnavailable(), request.verbose(), listener)),
() -> getMultipleReposSnapshotInfo(snapshotsInProgress, response.repositories(),
request.snapshots(), request.ignoreUnavailable(), request.verbose(), listener)),
listener::onFailure),
GetRepositoriesResponse::new));
}

private void getMultipleReposSnapshotInfo(List<RepositoryMetaData> repos, String[] snapshots, boolean ignoreUnavailable,
boolean verbose, ActionListener<GetSnapshotsResponse> listener) {
private void getMultipleReposSnapshotInfo(@Nullable SnapshotsInProgress snapshotsInProgress, List<RepositoryMetaData> repos,
String[] snapshots, boolean ignoreUnavailable, boolean verbose,
ActionListener<GetSnapshotsResponse> listener) {
// short-circuit if there are no repos, because we can not create GroupedActionListener of size 0
if (repos.isEmpty()) {
listener.onResponse(new GetSnapshotsResponse(Collections.emptyList()));
Expand All @@ -133,16 +136,16 @@ private void getMultipleReposSnapshotInfo(List<RepositoryMetaData> repos, String
} else {
groupedListener.onFailure(e);
}
}), wrappedListener -> getSingleRepoSnapshotInfo(repoName, snapshots, ignoreUnavailable, verbose,
}), wrappedListener -> getSingleRepoSnapshotInfo(snapshotsInProgress, repoName, snapshots, ignoreUnavailable, verbose,
ActionListener.map(wrappedListener, snInfos -> GetSnapshotsResponse.Response.snapshots(repoName, snInfos)))));
}
}

private void getSingleRepoSnapshotInfo(String repo, String[] snapshots, boolean ignoreUnavailable, boolean verbose,
ActionListener<List<SnapshotInfo>> listener) {
private void getSingleRepoSnapshotInfo(@Nullable SnapshotsInProgress snapshotsInProgress, String repo, String[] snapshots,
boolean ignoreUnavailable, boolean verbose, ActionListener<List<SnapshotInfo>> listener) {
final Map<String, SnapshotId> allSnapshotIds = new HashMap<>();
final List<SnapshotInfo> currentSnapshots = new ArrayList<>();
for (SnapshotInfo snapshotInfo : snapshotsService.currentSnapshots(repo)) {
for (SnapshotInfo snapshotInfo : SnapshotsService.currentSnapshots(snapshotsInProgress, repo)) {
SnapshotId snapshotId = snapshotInfo.snapshotId();
allSnapshotIds.put(snapshotId.getName(), snapshotId);
currentSnapshots.add(snapshotInfo);
Expand All @@ -155,14 +158,14 @@ private void getSingleRepoSnapshotInfo(String repo, String[] snapshots, boolean
snapshotsService.getRepositoryData(repo, repositoryDataListener);
}

repositoryDataListener.whenComplete(repositoryData -> listener.onResponse(
loadSnapshotInfos(repo, snapshots, ignoreUnavailable, verbose, allSnapshotIds, currentSnapshots, repositoryData)),
repositoryDataListener.whenComplete(repositoryData -> listener.onResponse(loadSnapshotInfos(snapshotsInProgress, repo, snapshots,
ignoreUnavailable, verbose, allSnapshotIds, currentSnapshots, repositoryData)),
listener::onFailure);
}

private List<SnapshotInfo> loadSnapshotInfos(String repo, String[] snapshots, boolean ignoreUnavailable, boolean verbose,
Map<String, SnapshotId> allSnapshotIds, List<SnapshotInfo> currentSnapshots,
@Nullable RepositoryData repositoryData) {
private List<SnapshotInfo> loadSnapshotInfos(@Nullable SnapshotsInProgress snapshotsInProgress, String repo, String[] snapshots,
boolean ignoreUnavailable, boolean verbose, Map<String, SnapshotId> allSnapshotIds,
List<SnapshotInfo> currentSnapshots, @Nullable RepositoryData repositoryData) {
if (repositoryData != null) {
for (SnapshotId snapshotId : repositoryData.getSnapshotIds()) {
allSnapshotIds.put(snapshotId.getName(), snapshotId);
Expand Down Expand Up @@ -198,7 +201,7 @@ private List<SnapshotInfo> loadSnapshotInfos(String repo, String[] snapshots, bo

final List<SnapshotInfo> snapshotInfos;
if (verbose) {
snapshotInfos = snapshotsService.snapshots(repo, new ArrayList<>(toResolve), ignoreUnavailable);
snapshotInfos = snapshotsService.snapshots(snapshotsInProgress, repo, new ArrayList<>(toResolve), ignoreUnavailable);
} else {
if (repositoryData != null) {
// want non-current snapshots as well, which are found in the repository data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -100,10 +101,11 @@ protected SnapshotsStatusResponse read(StreamInput in) throws IOException {
protected void masterOperation(Task task, final SnapshotsStatusRequest request,
final ClusterState state,
final ActionListener<SnapshotsStatusResponse> listener) throws Exception {
final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE);
List<SnapshotsInProgress.Entry> currentSnapshots =
snapshotsService.currentSnapshots(request.repository(), Arrays.asList(request.snapshots()));
SnapshotsService.currentSnapshots(snapshotsInProgress, request.repository(), Arrays.asList(request.snapshots()));
if (currentSnapshots.isEmpty()) {
buildResponse(request, currentSnapshots, null, listener);
buildResponse(snapshotsInProgress, request, currentSnapshots, null, listener);
return;
}

Expand All @@ -127,18 +129,17 @@ protected void masterOperation(Task task, final SnapshotsStatusRequest request,
.snapshots(snapshots).timeout(request.masterNodeTimeout()),
ActionListener.wrap(nodeSnapshotStatuses -> threadPool.generic().execute(
ActionRunnable.wrap(listener,
l -> buildResponse(
request, snapshotsService.currentSnapshots(request.repository(), Arrays.asList(request.snapshots())),
nodeSnapshotStatuses, l))
l -> buildResponse(snapshotsInProgress, request, currentSnapshots, nodeSnapshotStatuses, l))
), listener::onFailure));
} else {
// We don't have any in-progress shards, just return current stats
buildResponse(request, currentSnapshots, null, listener);
buildResponse(snapshotsInProgress, request, currentSnapshots, null, listener);
}

}

private void buildResponse(SnapshotsStatusRequest request, List<SnapshotsInProgress.Entry> currentSnapshotEntries,
private void buildResponse(@Nullable SnapshotsInProgress snapshotsInProgress, SnapshotsStatusRequest request,
List<SnapshotsInProgress.Entry> currentSnapshotEntries,
TransportNodesSnapshotsStatus.NodesSnapshotStatus nodeSnapshotStatuses,
ActionListener<SnapshotsStatusResponse> listener) {
// First process snapshot that are currently processed
Expand Down Expand Up @@ -200,14 +201,15 @@ private void buildResponse(SnapshotsStatusRequest request, List<SnapshotsInProgr
// Now add snapshots on disk that are not currently running
final String repositoryName = request.repository();
if (Strings.hasText(repositoryName) && request.snapshots() != null && request.snapshots().length > 0) {
loadRepositoryData(request, builder, currentSnapshotNames, repositoryName, listener);
loadRepositoryData(snapshotsInProgress, request, builder, currentSnapshotNames, repositoryName, listener);
} else {
listener.onResponse(new SnapshotsStatusResponse(Collections.unmodifiableList(builder)));
}
}

private void loadRepositoryData(SnapshotsStatusRequest request, List<SnapshotStatus> builder, Set<String> currentSnapshotNames,
String repositoryName, ActionListener<SnapshotsStatusResponse> listener) {
private void loadRepositoryData(@Nullable SnapshotsInProgress snapshotsInProgress, SnapshotsStatusRequest request,
List<SnapshotStatus> builder, Set<String> currentSnapshotNames, String repositoryName,
ActionListener<SnapshotsStatusResponse> listener) {
final Set<String> requestedSnapshotNames = Sets.newHashSet(request.snapshots());
final StepListener<RepositoryData> repositoryDataListener = new StepListener<>();
snapshotsService.getRepositoryData(repositoryName, repositoryDataListener);
Expand All @@ -232,7 +234,7 @@ private void loadRepositoryData(SnapshotsStatusRequest request, List<SnapshotSta
throw new SnapshotMissingException(repositoryName, snapshotName);
}
}
SnapshotInfo snapshotInfo = snapshotsService.snapshot(repositoryName, snapshotId);
SnapshotInfo snapshotInfo = snapshotsService.snapshot(snapshotsInProgress, repositoryName, snapshotId);
List<SnapshotIndexShardStatus> shardStatusBuilder = new ArrayList<>();
if (snapshotInfo.state().completed()) {
Map<ShardId, IndexShardSnapshotStatus> shardStatuses =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,15 @@ public void getRepositoryData(final String repositoryName, final ActionListener<
/**
* Retrieves snapshot from repository
*
* @param repositoryName repository name
* @param snapshotId snapshot id
* @param snapshotsInProgress snapshots in progress in the cluster state
* @param repositoryName repository name
* @param snapshotId snapshot id
* @return snapshot
* @throws SnapshotMissingException if snapshot is not found
*/
public SnapshotInfo snapshot(final String repositoryName, final SnapshotId snapshotId) {
List<SnapshotsInProgress.Entry> entries = currentSnapshots(repositoryName, Collections.singletonList(snapshotId.getName()));
public SnapshotInfo snapshot(@Nullable SnapshotsInProgress snapshotsInProgress, String repositoryName, SnapshotId snapshotId) {
List<SnapshotsInProgress.Entry> entries =
currentSnapshots(snapshotsInProgress, repositoryName, Collections.singletonList(snapshotId.getName()));
if (!entries.isEmpty()) {
return inProgressSnapshot(entries.iterator().next());
}
Expand All @@ -190,18 +192,20 @@ public SnapshotInfo snapshot(final String repositoryName, final SnapshotId snaps
/**
* Returns a list of snapshots from repository sorted by snapshot creation date
*
* @param repositoryName repository name
* @param snapshotIds snapshots for which to fetch snapshot information
* @param ignoreUnavailable if true, snapshots that could not be read will only be logged with a warning,
* if false, they will throw an error
* @param snapshotsInProgress snapshots in progress in the cluster state
* @param repositoryName repository name
* @param snapshotIds snapshots for which to fetch snapshot information
* @param ignoreUnavailable if true, snapshots that could not be read will only be logged with a warning,
* if false, they will throw an error
* @return list of snapshots
*/
public List<SnapshotInfo> snapshots(final String repositoryName, final List<SnapshotId> snapshotIds, final boolean ignoreUnavailable) {
public List<SnapshotInfo> snapshots(@Nullable SnapshotsInProgress snapshotsInProgress, String repositoryName,
List<SnapshotId> snapshotIds, boolean ignoreUnavailable) {
final Set<SnapshotInfo> snapshotSet = new HashSet<>();
final Set<SnapshotId> snapshotIdsToIterate = new HashSet<>(snapshotIds);
// first, look at the snapshots in progress
final List<SnapshotsInProgress.Entry> entries =
currentSnapshots(repositoryName, snapshotIdsToIterate.stream().map(SnapshotId::getName).collect(Collectors.toList()));
final List<SnapshotsInProgress.Entry> entries = currentSnapshots(
snapshotsInProgress, repositoryName, snapshotIdsToIterate.stream().map(SnapshotId::getName).collect(Collectors.toList()));
for (SnapshotsInProgress.Entry entry : entries) {
snapshotSet.add(inProgressSnapshot(entry));
snapshotIdsToIterate.remove(entry.snapshot().getSnapshotId());
Expand Down Expand Up @@ -230,12 +234,13 @@ public List<SnapshotInfo> snapshots(final String repositoryName, final List<Snap
/**
* Returns a list of currently running snapshots from repository sorted by snapshot creation date
*
* @param snapshotsInProgress snapshots in progress in the cluster state
* @param repositoryName repository name
* @return list of snapshots
*/
public List<SnapshotInfo> currentSnapshots(final String repositoryName) {
public static List<SnapshotInfo> currentSnapshots(@Nullable SnapshotsInProgress snapshotsInProgress, String repositoryName) {
List<SnapshotInfo> snapshotList = new ArrayList<>();
List<SnapshotsInProgress.Entry> entries = currentSnapshots(repositoryName, Collections.emptyList());
List<SnapshotsInProgress.Entry> entries = currentSnapshots(snapshotsInProgress, repositoryName, Collections.emptyList());
for (SnapshotsInProgress.Entry entry : entries) {
snapshotList.add(inProgressSnapshot(entry));
}
Expand Down Expand Up @@ -673,12 +678,13 @@ private static SnapshotInfo inProgressSnapshot(SnapshotsInProgress.Entry entry)
* This method is executed on master node
* </p>
*
* @param repository repository id
* @param snapshots list of snapshots that will be used as a filter, empty list means no snapshots are filtered
* @param snapshotsInProgress snapshots in progress in the cluster state
* @param repository repository id
* @param snapshots list of snapshots that will be used as a filter, empty list means no snapshots are filtered
* @return list of metadata for currently running snapshots
*/
public List<SnapshotsInProgress.Entry> currentSnapshots(final String repository, final List<String> snapshots) {
SnapshotsInProgress snapshotsInProgress = clusterService.state().custom(SnapshotsInProgress.TYPE);
public static List<SnapshotsInProgress.Entry> currentSnapshots(@Nullable SnapshotsInProgress snapshotsInProgress, String repository,
List<String> snapshots) {
if (snapshotsInProgress == null || snapshotsInProgress.entries().isEmpty()) {
return Collections.emptyList();
}
Expand Down Expand Up @@ -1222,7 +1228,8 @@ public void deleteSnapshot(final String repositoryName, final String snapshotNam
// if nothing found by the same name, then look in the cluster state for current in progress snapshots
long repoGenId = repositoryData.getGenId();
if (matchedEntry.isPresent() == false) {
Optional<SnapshotsInProgress.Entry> matchedInProgress = currentSnapshots(repositoryName, Collections.emptyList()).stream()
Optional<SnapshotsInProgress.Entry> matchedInProgress = currentSnapshots(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making the fact that we're using a potentially racy CS here, leading to some needless snapshot delete failures, obvious was the main motivation behind this change actually (a bigger change to deletes making use of this cleanup is coming in #51463).

clusterService.state().custom(SnapshotsInProgress.TYPE), repositoryName, Collections.emptyList()).stream()
.filter(s -> s.snapshot().getSnapshotId().getName().equals(snapshotName)).findFirst();
if (matchedInProgress.isPresent()) {
matchedEntry = matchedInProgress.map(s -> s.snapshot().getSnapshotId());
Expand Down