Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Consistent ClusterState throughout Snapshot API Calls #51464

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,14 @@ protected void masterOperation(Task task, final GetSnapshotsRequest request, fin
response ->
// switch to GENERIC thread pool because it might be long running operation
threadPool.executor(ThreadPool.Names.GENERIC).execute(
() -> getMultipleReposSnapshotInfo(response.repositories(), request.snapshots(),
() -> getMultipleReposSnapshotInfo(state, response.repositories(), request.snapshots(),
request.ignoreUnavailable(), request.verbose(), listener)),
listener::onFailure),
GetRepositoriesResponse::new));
}

private void getMultipleReposSnapshotInfo(List<RepositoryMetaData> repos, String[] snapshots, boolean ignoreUnavailable,
boolean verbose, ActionListener<GetSnapshotsResponse> listener) {
private void getMultipleReposSnapshotInfo(ClusterState state, List<RepositoryMetaData> repos, String[] snapshots,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just pass down SnapshotsInProgress? Makes it a bit clearer what the dependencies are

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure sounds good :) => f2efe16

boolean ignoreUnavailable, boolean verbose, ActionListener<GetSnapshotsResponse> listener) {
// short-circuit if there are no repos, because we can not create GroupedActionListener of size 0
if (repos.isEmpty()) {
listener.onResponse(new GetSnapshotsResponse(Collections.emptyList()));
Expand All @@ -133,16 +133,16 @@ private void getMultipleReposSnapshotInfo(List<RepositoryMetaData> repos, String
} else {
groupedListener.onFailure(e);
}
}), wrappedListener -> getSingleRepoSnapshotInfo(repoName, snapshots, ignoreUnavailable, verbose,
}), wrappedListener -> getSingleRepoSnapshotInfo(state, repoName, snapshots, ignoreUnavailable, verbose,
ActionListener.map(wrappedListener, snInfos -> GetSnapshotsResponse.Response.snapshots(repoName, snInfos)))));
}
}

private void getSingleRepoSnapshotInfo(String repo, String[] snapshots, boolean ignoreUnavailable, boolean verbose,
private void getSingleRepoSnapshotInfo(ClusterState state, String repo, String[] snapshots, boolean ignoreUnavailable, boolean verbose,
ActionListener<List<SnapshotInfo>> listener) {
final Map<String, SnapshotId> allSnapshotIds = new HashMap<>();
final List<SnapshotInfo> currentSnapshots = new ArrayList<>();
for (SnapshotInfo snapshotInfo : snapshotsService.currentSnapshots(repo)) {
for (SnapshotInfo snapshotInfo : SnapshotsService.currentSnapshots(state, repo)) {
SnapshotId snapshotId = snapshotInfo.snapshotId();
allSnapshotIds.put(snapshotId.getName(), snapshotId);
currentSnapshots.add(snapshotInfo);
Expand All @@ -156,13 +156,13 @@ private void getSingleRepoSnapshotInfo(String repo, String[] snapshots, boolean
}

repositoryDataListener.whenComplete(repositoryData -> listener.onResponse(
loadSnapshotInfos(repo, snapshots, ignoreUnavailable, verbose, allSnapshotIds, currentSnapshots, repositoryData)),
loadSnapshotInfos(state, repo, snapshots, ignoreUnavailable, verbose, allSnapshotIds, currentSnapshots, repositoryData)),
listener::onFailure);
}

private List<SnapshotInfo> loadSnapshotInfos(String repo, String[] snapshots, boolean ignoreUnavailable, boolean verbose,
Map<String, SnapshotId> allSnapshotIds, List<SnapshotInfo> currentSnapshots,
@Nullable RepositoryData repositoryData) {
private List<SnapshotInfo> loadSnapshotInfos(ClusterState state, String repo, String[] snapshots, boolean ignoreUnavailable,
boolean verbose, Map<String, SnapshotId> allSnapshotIds,
List<SnapshotInfo> currentSnapshots, @Nullable RepositoryData repositoryData) {
if (repositoryData != null) {
for (SnapshotId snapshotId : repositoryData.getSnapshotIds()) {
allSnapshotIds.put(snapshotId.getName(), snapshotId);
Expand Down Expand Up @@ -198,7 +198,7 @@ private List<SnapshotInfo> loadSnapshotInfos(String repo, String[] snapshots, bo

final List<SnapshotInfo> snapshotInfos;
if (verbose) {
snapshotInfos = snapshotsService.snapshots(repo, new ArrayList<>(toResolve), ignoreUnavailable);
snapshotInfos = snapshotsService.snapshots(state, repo, new ArrayList<>(toResolve), ignoreUnavailable);
} else {
if (repositoryData != null) {
// want non-current snapshots as well, which are found in the repository data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ protected void masterOperation(Task task, final SnapshotsStatusRequest request,
final ClusterState state,
final ActionListener<SnapshotsStatusResponse> listener) throws Exception {
List<SnapshotsInProgress.Entry> currentSnapshots =
snapshotsService.currentSnapshots(request.repository(), Arrays.asList(request.snapshots()));
SnapshotsService.currentSnapshots(state, request.repository(), Arrays.asList(request.snapshots()));
if (currentSnapshots.isEmpty()) {
buildResponse(request, currentSnapshots, null, listener);
buildResponse(state, request, currentSnapshots, null, listener);
return;
}

Expand All @@ -127,18 +127,16 @@ protected void masterOperation(Task task, final SnapshotsStatusRequest request,
.snapshots(snapshots).timeout(request.masterNodeTimeout()),
ActionListener.wrap(nodeSnapshotStatuses -> threadPool.generic().execute(
ActionRunnable.wrap(listener,
l -> buildResponse(
request, snapshotsService.currentSnapshots(request.repository(), Arrays.asList(request.snapshots())),
nodeSnapshotStatuses, l))
l -> buildResponse(state, request, currentSnapshots, nodeSnapshotStatuses, l))
), listener::onFailure));
} else {
// We don't have any in-progress shards, just return current stats
buildResponse(request, currentSnapshots, null, listener);
buildResponse(state, request, currentSnapshots, null, listener);
}

}

private void buildResponse(SnapshotsStatusRequest request, List<SnapshotsInProgress.Entry> currentSnapshotEntries,
private void buildResponse(ClusterState state, SnapshotsStatusRequest request, List<SnapshotsInProgress.Entry> currentSnapshotEntries,
TransportNodesSnapshotsStatus.NodesSnapshotStatus nodeSnapshotStatuses,
ActionListener<SnapshotsStatusResponse> listener) {
// First process snapshot that are currently processed
Expand Down Expand Up @@ -200,14 +198,15 @@ private void buildResponse(SnapshotsStatusRequest request, List<SnapshotsInProgr
// Now add snapshots on disk that are not currently running
final String repositoryName = request.repository();
if (Strings.hasText(repositoryName) && request.snapshots() != null && request.snapshots().length > 0) {
loadRepositoryData(request, builder, currentSnapshotNames, repositoryName, listener);
loadRepositoryData(state, request, builder, currentSnapshotNames, repositoryName, listener);
} else {
listener.onResponse(new SnapshotsStatusResponse(Collections.unmodifiableList(builder)));
}
}

private void loadRepositoryData(SnapshotsStatusRequest request, List<SnapshotStatus> builder, Set<String> currentSnapshotNames,
String repositoryName, ActionListener<SnapshotsStatusResponse> listener) {
private void loadRepositoryData(ClusterState clusterState, SnapshotsStatusRequest request, List<SnapshotStatus> builder,
Set<String> currentSnapshotNames, String repositoryName,
ActionListener<SnapshotsStatusResponse> listener) {
final Set<String> requestedSnapshotNames = Sets.newHashSet(request.snapshots());
final StepListener<RepositoryData> repositoryDataListener = new StepListener<>();
snapshotsService.getRepositoryData(repositoryName, repositoryDataListener);
Expand All @@ -232,7 +231,7 @@ private void loadRepositoryData(SnapshotsStatusRequest request, List<SnapshotSta
throw new SnapshotMissingException(repositoryName, snapshotName);
}
}
SnapshotInfo snapshotInfo = snapshotsService.snapshot(repositoryName, snapshotId);
SnapshotInfo snapshotInfo = snapshotsService.snapshot(clusterState, repositoryName, snapshotId);
List<SnapshotIndexShardStatus> shardStatusBuilder = new ArrayList<>();
if (snapshotInfo.state().completed()) {
Map<ShardId, IndexShardSnapshotStatus> shardStatuses =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,14 @@ public void getRepositoryData(final String repositoryName, final ActionListener<
/**
* Retrieves snapshot from repository
*
* @param state cluster state
* @param repositoryName repository name
* @param snapshotId snapshot id
* @return snapshot
* @throws SnapshotMissingException if snapshot is not found
*/
public SnapshotInfo snapshot(final String repositoryName, final SnapshotId snapshotId) {
List<SnapshotsInProgress.Entry> entries = currentSnapshots(repositoryName, Collections.singletonList(snapshotId.getName()));
public SnapshotInfo snapshot(ClusterState state, final String repositoryName, final SnapshotId snapshotId) {
List<SnapshotsInProgress.Entry> entries = currentSnapshots(state, repositoryName, Collections.singletonList(snapshotId.getName()));
if (!entries.isEmpty()) {
return inProgressSnapshot(entries.iterator().next());
}
Expand All @@ -190,18 +191,20 @@ public SnapshotInfo snapshot(final String repositoryName, final SnapshotId snaps
/**
* Returns a list of snapshots from repository sorted by snapshot creation date
*
* @param state cluster state
* @param repositoryName repository name
* @param snapshotIds snapshots for which to fetch snapshot information
* @param ignoreUnavailable if true, snapshots that could not be read will only be logged with a warning,
* if false, they will throw an error
* @return list of snapshots
*/
public List<SnapshotInfo> snapshots(final String repositoryName, final List<SnapshotId> snapshotIds, final boolean ignoreUnavailable) {
public List<SnapshotInfo> snapshots(ClusterState state, String repositoryName, List<SnapshotId> snapshotIds,
boolean ignoreUnavailable) {
final Set<SnapshotInfo> snapshotSet = new HashSet<>();
final Set<SnapshotId> snapshotIdsToIterate = new HashSet<>(snapshotIds);
// first, look at the snapshots in progress
final List<SnapshotsInProgress.Entry> entries =
currentSnapshots(repositoryName, snapshotIdsToIterate.stream().map(SnapshotId::getName).collect(Collectors.toList()));
currentSnapshots(state, repositoryName, snapshotIdsToIterate.stream().map(SnapshotId::getName).collect(Collectors.toList()));
for (SnapshotsInProgress.Entry entry : entries) {
snapshotSet.add(inProgressSnapshot(entry));
snapshotIdsToIterate.remove(entry.snapshot().getSnapshotId());
Expand Down Expand Up @@ -230,12 +233,13 @@ public List<SnapshotInfo> snapshots(final String repositoryName, final List<Snap
/**
* Returns a list of currently running snapshots from repository sorted by snapshot creation date
*
* @param state cluster state
* @param repositoryName repository name
* @return list of snapshots
*/
public List<SnapshotInfo> currentSnapshots(final String repositoryName) {
public static List<SnapshotInfo> currentSnapshots(ClusterState state, String repositoryName) {
List<SnapshotInfo> snapshotList = new ArrayList<>();
List<SnapshotsInProgress.Entry> entries = currentSnapshots(repositoryName, Collections.emptyList());
List<SnapshotsInProgress.Entry> entries = currentSnapshots(state, repositoryName, Collections.emptyList());
for (SnapshotsInProgress.Entry entry : entries) {
snapshotList.add(inProgressSnapshot(entry));
}
Expand Down Expand Up @@ -677,8 +681,8 @@ private static SnapshotInfo inProgressSnapshot(SnapshotsInProgress.Entry entry)
* @param snapshots list of snapshots that will be used as a filter, empty list means no snapshots are filtered
* @return list of metadata for currently running snapshots
*/
public List<SnapshotsInProgress.Entry> currentSnapshots(final String repository, final List<String> snapshots) {
SnapshotsInProgress snapshotsInProgress = clusterService.state().custom(SnapshotsInProgress.TYPE);
public static List<SnapshotsInProgress.Entry> currentSnapshots(ClusterState state, String repository, List<String> snapshots) {
SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE);
if (snapshotsInProgress == null || snapshotsInProgress.entries().isEmpty()) {
return Collections.emptyList();
}
Expand Down Expand Up @@ -1222,7 +1226,8 @@ public void deleteSnapshot(final String repositoryName, final String snapshotNam
// if nothing found by the same name, then look in the cluster state for current in progress snapshots
long repoGenId = repositoryData.getGenId();
if (matchedEntry.isPresent() == false) {
Optional<SnapshotsInProgress.Entry> matchedInProgress = currentSnapshots(repositoryName, Collections.emptyList()).stream()
Optional<SnapshotsInProgress.Entry> matchedInProgress = currentSnapshots(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making the fact that we're using a potentially racy CS here, leading to some needless snapshot delete failures, obvious was the main motivation behind this change actually (a bigger change to deletes making use of this cleanup is coming in #51463).

clusterService.state(), repositoryName, Collections.emptyList()).stream()
.filter(s -> s.snapshot().getSnapshotId().getName().equals(snapshotName)).findFirst();
if (matchedInProgress.isPresent()) {
matchedEntry = matchedInProgress.map(s -> s.snapshot().getSnapshotId());
Expand Down