Skip to content

Commit

Permalink
Use Consistent ClusterState throughout Snapshot API Calls (#51464)
Browse files Browse the repository at this point in the history
We shouldn't be using potentially changing versions of the cluster state
when answering a snapshot status API call by calling `SnapshotService#currentSnapshots` multiple times (each time using `ClusterService#state` under the hood) but instead pass down the state from the transport action.
Having these API behave more in a more deterministic way will make it easier to use them once parallel repository operations
are introduced.
  • Loading branch information
original-brownbear authored Jan 27, 2020
1 parent fc57826 commit 1e5f77e
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
Expand Down Expand Up @@ -96,21 +97,23 @@ protected ClusterBlockException checkBlock(GetSnapshotsRequest request, ClusterS
protected void masterOperation(Task task, final GetSnapshotsRequest request, final ClusterState state,
final ActionListener<GetSnapshotsResponse> listener) {
final String[] repositories = request.repositories();
final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE);
transportService.sendChildRequest(transportService.getLocalNode(), GetRepositoriesAction.NAME,
new GetRepositoriesRequest(repositories), task, TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(
ActionListener.wrap(
response ->
// switch to GENERIC thread pool because it might be long running operation
threadPool.executor(ThreadPool.Names.GENERIC).execute(
() -> getMultipleReposSnapshotInfo(response.repositories(), request.snapshots(),
request.ignoreUnavailable(), request.verbose(), listener)),
() -> getMultipleReposSnapshotInfo(snapshotsInProgress, response.repositories(),
request.snapshots(), request.ignoreUnavailable(), request.verbose(), listener)),
listener::onFailure),
GetRepositoriesResponse::new));
}

private void getMultipleReposSnapshotInfo(List<RepositoryMetaData> repos, String[] snapshots, boolean ignoreUnavailable,
boolean verbose, ActionListener<GetSnapshotsResponse> listener) {
private void getMultipleReposSnapshotInfo(@Nullable SnapshotsInProgress snapshotsInProgress, List<RepositoryMetaData> repos,
String[] snapshots, boolean ignoreUnavailable, boolean verbose,
ActionListener<GetSnapshotsResponse> listener) {
// short-circuit if there are no repos, because we can not create GroupedActionListener of size 0
if (repos.isEmpty()) {
listener.onResponse(new GetSnapshotsResponse(Collections.emptyList()));
Expand All @@ -133,16 +136,16 @@ private void getMultipleReposSnapshotInfo(List<RepositoryMetaData> repos, String
} else {
groupedListener.onFailure(e);
}
}), wrappedListener -> getSingleRepoSnapshotInfo(repoName, snapshots, ignoreUnavailable, verbose,
}), wrappedListener -> getSingleRepoSnapshotInfo(snapshotsInProgress, repoName, snapshots, ignoreUnavailable, verbose,
ActionListener.map(wrappedListener, snInfos -> GetSnapshotsResponse.Response.snapshots(repoName, snInfos)))));
}
}

private void getSingleRepoSnapshotInfo(String repo, String[] snapshots, boolean ignoreUnavailable, boolean verbose,
ActionListener<List<SnapshotInfo>> listener) {
private void getSingleRepoSnapshotInfo(@Nullable SnapshotsInProgress snapshotsInProgress, String repo, String[] snapshots,
boolean ignoreUnavailable, boolean verbose, ActionListener<List<SnapshotInfo>> listener) {
final Map<String, SnapshotId> allSnapshotIds = new HashMap<>();
final List<SnapshotInfo> currentSnapshots = new ArrayList<>();
for (SnapshotInfo snapshotInfo : snapshotsService.currentSnapshots(repo)) {
for (SnapshotInfo snapshotInfo : SnapshotsService.currentSnapshots(snapshotsInProgress, repo)) {
SnapshotId snapshotId = snapshotInfo.snapshotId();
allSnapshotIds.put(snapshotId.getName(), snapshotId);
currentSnapshots.add(snapshotInfo);
Expand All @@ -155,14 +158,14 @@ private void getSingleRepoSnapshotInfo(String repo, String[] snapshots, boolean
snapshotsService.getRepositoryData(repo, repositoryDataListener);
}

repositoryDataListener.whenComplete(repositoryData -> listener.onResponse(
loadSnapshotInfos(repo, snapshots, ignoreUnavailable, verbose, allSnapshotIds, currentSnapshots, repositoryData)),
repositoryDataListener.whenComplete(repositoryData -> listener.onResponse(loadSnapshotInfos(snapshotsInProgress, repo, snapshots,
ignoreUnavailable, verbose, allSnapshotIds, currentSnapshots, repositoryData)),
listener::onFailure);
}

private List<SnapshotInfo> loadSnapshotInfos(String repo, String[] snapshots, boolean ignoreUnavailable, boolean verbose,
Map<String, SnapshotId> allSnapshotIds, List<SnapshotInfo> currentSnapshots,
@Nullable RepositoryData repositoryData) {
private List<SnapshotInfo> loadSnapshotInfos(@Nullable SnapshotsInProgress snapshotsInProgress, String repo, String[] snapshots,
boolean ignoreUnavailable, boolean verbose, Map<String, SnapshotId> allSnapshotIds,
List<SnapshotInfo> currentSnapshots, @Nullable RepositoryData repositoryData) {
if (repositoryData != null) {
for (SnapshotId snapshotId : repositoryData.getSnapshotIds()) {
allSnapshotIds.put(snapshotId.getName(), snapshotId);
Expand Down Expand Up @@ -198,7 +201,7 @@ private List<SnapshotInfo> loadSnapshotInfos(String repo, String[] snapshots, bo

final List<SnapshotInfo> snapshotInfos;
if (verbose) {
snapshotInfos = snapshotsService.snapshots(repo, new ArrayList<>(toResolve), ignoreUnavailable);
snapshotInfos = snapshotsService.snapshots(snapshotsInProgress, repo, new ArrayList<>(toResolve), ignoreUnavailable);
} else {
if (repositoryData != null) {
// want non-current snapshots as well, which are found in the repository data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -100,10 +101,11 @@ protected SnapshotsStatusResponse read(StreamInput in) throws IOException {
protected void masterOperation(Task task, final SnapshotsStatusRequest request,
final ClusterState state,
final ActionListener<SnapshotsStatusResponse> listener) throws Exception {
final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE);
List<SnapshotsInProgress.Entry> currentSnapshots =
snapshotsService.currentSnapshots(request.repository(), Arrays.asList(request.snapshots()));
SnapshotsService.currentSnapshots(snapshotsInProgress, request.repository(), Arrays.asList(request.snapshots()));
if (currentSnapshots.isEmpty()) {
buildResponse(request, currentSnapshots, null, listener);
buildResponse(snapshotsInProgress, request, currentSnapshots, null, listener);
return;
}

Expand All @@ -127,18 +129,17 @@ protected void masterOperation(Task task, final SnapshotsStatusRequest request,
.snapshots(snapshots).timeout(request.masterNodeTimeout()),
ActionListener.wrap(nodeSnapshotStatuses -> threadPool.generic().execute(
ActionRunnable.wrap(listener,
l -> buildResponse(
request, snapshotsService.currentSnapshots(request.repository(), Arrays.asList(request.snapshots())),
nodeSnapshotStatuses, l))
l -> buildResponse(snapshotsInProgress, request, currentSnapshots, nodeSnapshotStatuses, l))
), listener::onFailure));
} else {
// We don't have any in-progress shards, just return current stats
buildResponse(request, currentSnapshots, null, listener);
buildResponse(snapshotsInProgress, request, currentSnapshots, null, listener);
}

}

private void buildResponse(SnapshotsStatusRequest request, List<SnapshotsInProgress.Entry> currentSnapshotEntries,
private void buildResponse(@Nullable SnapshotsInProgress snapshotsInProgress, SnapshotsStatusRequest request,
List<SnapshotsInProgress.Entry> currentSnapshotEntries,
TransportNodesSnapshotsStatus.NodesSnapshotStatus nodeSnapshotStatuses,
ActionListener<SnapshotsStatusResponse> listener) {
// First process snapshot that are currently processed
Expand Down Expand Up @@ -200,14 +201,15 @@ private void buildResponse(SnapshotsStatusRequest request, List<SnapshotsInProgr
// Now add snapshots on disk that are not currently running
final String repositoryName = request.repository();
if (Strings.hasText(repositoryName) && request.snapshots() != null && request.snapshots().length > 0) {
loadRepositoryData(request, builder, currentSnapshotNames, repositoryName, listener);
loadRepositoryData(snapshotsInProgress, request, builder, currentSnapshotNames, repositoryName, listener);
} else {
listener.onResponse(new SnapshotsStatusResponse(Collections.unmodifiableList(builder)));
}
}

private void loadRepositoryData(SnapshotsStatusRequest request, List<SnapshotStatus> builder, Set<String> currentSnapshotNames,
String repositoryName, ActionListener<SnapshotsStatusResponse> listener) {
private void loadRepositoryData(@Nullable SnapshotsInProgress snapshotsInProgress, SnapshotsStatusRequest request,
List<SnapshotStatus> builder, Set<String> currentSnapshotNames, String repositoryName,
ActionListener<SnapshotsStatusResponse> listener) {
final Set<String> requestedSnapshotNames = Sets.newHashSet(request.snapshots());
final StepListener<RepositoryData> repositoryDataListener = new StepListener<>();
snapshotsService.getRepositoryData(repositoryName, repositoryDataListener);
Expand All @@ -232,7 +234,7 @@ private void loadRepositoryData(SnapshotsStatusRequest request, List<SnapshotSta
throw new SnapshotMissingException(repositoryName, snapshotName);
}
}
SnapshotInfo snapshotInfo = snapshotsService.snapshot(repositoryName, snapshotId);
SnapshotInfo snapshotInfo = snapshotsService.snapshot(snapshotsInProgress, repositoryName, snapshotId);
List<SnapshotIndexShardStatus> shardStatusBuilder = new ArrayList<>();
if (snapshotInfo.state().completed()) {
Map<ShardId, IndexShardSnapshotStatus> shardStatuses =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,15 @@ public void getRepositoryData(final String repositoryName, final ActionListener<
/**
* Retrieves snapshot from repository
*
* @param repositoryName repository name
* @param snapshotId snapshot id
* @param snapshotsInProgress snapshots in progress in the cluster state
* @param repositoryName repository name
* @param snapshotId snapshot id
* @return snapshot
* @throws SnapshotMissingException if snapshot is not found
*/
public SnapshotInfo snapshot(final String repositoryName, final SnapshotId snapshotId) {
List<SnapshotsInProgress.Entry> entries = currentSnapshots(repositoryName, Collections.singletonList(snapshotId.getName()));
public SnapshotInfo snapshot(@Nullable SnapshotsInProgress snapshotsInProgress, String repositoryName, SnapshotId snapshotId) {
List<SnapshotsInProgress.Entry> entries =
currentSnapshots(snapshotsInProgress, repositoryName, Collections.singletonList(snapshotId.getName()));
if (!entries.isEmpty()) {
return inProgressSnapshot(entries.iterator().next());
}
Expand All @@ -190,18 +192,20 @@ public SnapshotInfo snapshot(final String repositoryName, final SnapshotId snaps
/**
* Returns a list of snapshots from repository sorted by snapshot creation date
*
* @param repositoryName repository name
* @param snapshotIds snapshots for which to fetch snapshot information
* @param ignoreUnavailable if true, snapshots that could not be read will only be logged with a warning,
* if false, they will throw an error
* @param snapshotsInProgress snapshots in progress in the cluster state
* @param repositoryName repository name
* @param snapshotIds snapshots for which to fetch snapshot information
* @param ignoreUnavailable if true, snapshots that could not be read will only be logged with a warning,
* if false, they will throw an error
* @return list of snapshots
*/
public List<SnapshotInfo> snapshots(final String repositoryName, final List<SnapshotId> snapshotIds, final boolean ignoreUnavailable) {
public List<SnapshotInfo> snapshots(@Nullable SnapshotsInProgress snapshotsInProgress, String repositoryName,
List<SnapshotId> snapshotIds, boolean ignoreUnavailable) {
final Set<SnapshotInfo> snapshotSet = new HashSet<>();
final Set<SnapshotId> snapshotIdsToIterate = new HashSet<>(snapshotIds);
// first, look at the snapshots in progress
final List<SnapshotsInProgress.Entry> entries =
currentSnapshots(repositoryName, snapshotIdsToIterate.stream().map(SnapshotId::getName).collect(Collectors.toList()));
final List<SnapshotsInProgress.Entry> entries = currentSnapshots(
snapshotsInProgress, repositoryName, snapshotIdsToIterate.stream().map(SnapshotId::getName).collect(Collectors.toList()));
for (SnapshotsInProgress.Entry entry : entries) {
snapshotSet.add(inProgressSnapshot(entry));
snapshotIdsToIterate.remove(entry.snapshot().getSnapshotId());
Expand Down Expand Up @@ -230,12 +234,13 @@ public List<SnapshotInfo> snapshots(final String repositoryName, final List<Snap
/**
* Returns a list of currently running snapshots from repository sorted by snapshot creation date
*
* @param snapshotsInProgress snapshots in progress in the cluster state
* @param repositoryName repository name
* @return list of snapshots
*/
public List<SnapshotInfo> currentSnapshots(final String repositoryName) {
public static List<SnapshotInfo> currentSnapshots(@Nullable SnapshotsInProgress snapshotsInProgress, String repositoryName) {
List<SnapshotInfo> snapshotList = new ArrayList<>();
List<SnapshotsInProgress.Entry> entries = currentSnapshots(repositoryName, Collections.emptyList());
List<SnapshotsInProgress.Entry> entries = currentSnapshots(snapshotsInProgress, repositoryName, Collections.emptyList());
for (SnapshotsInProgress.Entry entry : entries) {
snapshotList.add(inProgressSnapshot(entry));
}
Expand Down Expand Up @@ -673,12 +678,13 @@ private static SnapshotInfo inProgressSnapshot(SnapshotsInProgress.Entry entry)
* This method is executed on master node
* </p>
*
* @param repository repository id
* @param snapshots list of snapshots that will be used as a filter, empty list means no snapshots are filtered
* @param snapshotsInProgress snapshots in progress in the cluster state
* @param repository repository id
* @param snapshots list of snapshots that will be used as a filter, empty list means no snapshots are filtered
* @return list of metadata for currently running snapshots
*/
public List<SnapshotsInProgress.Entry> currentSnapshots(final String repository, final List<String> snapshots) {
SnapshotsInProgress snapshotsInProgress = clusterService.state().custom(SnapshotsInProgress.TYPE);
public static List<SnapshotsInProgress.Entry> currentSnapshots(@Nullable SnapshotsInProgress snapshotsInProgress, String repository,
List<String> snapshots) {
if (snapshotsInProgress == null || snapshotsInProgress.entries().isEmpty()) {
return Collections.emptyList();
}
Expand Down Expand Up @@ -1222,7 +1228,8 @@ public void deleteSnapshot(final String repositoryName, final String snapshotNam
// if nothing found by the same name, then look in the cluster state for current in progress snapshots
long repoGenId = repositoryData.getGenId();
if (matchedEntry.isPresent() == false) {
Optional<SnapshotsInProgress.Entry> matchedInProgress = currentSnapshots(repositoryName, Collections.emptyList()).stream()
Optional<SnapshotsInProgress.Entry> matchedInProgress = currentSnapshots(
clusterService.state().custom(SnapshotsInProgress.TYPE), repositoryName, Collections.emptyList()).stream()
.filter(s -> s.snapshot().getSnapshotId().getName().equals(snapshotName)).findFirst();
if (matchedInProgress.isPresent()) {
matchedEntry = matchedInProgress.map(s -> s.snapshot().getSnapshotId());
Expand Down

0 comments on commit 1e5f77e

Please sign in to comment.