Skip to content

Commit

Permalink
CR: pass around SnapshotsInProgress
Browse files Browse the repository at this point in the history
  • Loading branch information
original-brownbear committed Jan 27, 2020
1 parent 08dfd9b commit f2efe16
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
Expand Down Expand Up @@ -96,20 +97,21 @@ protected ClusterBlockException checkBlock(GetSnapshotsRequest request, ClusterS
protected void masterOperation(Task task, final GetSnapshotsRequest request, final ClusterState state,
final ActionListener<GetSnapshotsResponse> listener) {
final String[] repositories = request.repositories();
final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE);
transportService.sendChildRequest(transportService.getLocalNode(), GetRepositoriesAction.NAME,
new GetRepositoriesRequest(repositories), task, TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(
ActionListener.wrap(
response ->
// switch to GENERIC thread pool because it might be long running operation
threadPool.executor(ThreadPool.Names.GENERIC).execute(
() -> getMultipleReposSnapshotInfo(state, response.repositories(), request.snapshots(),
request.ignoreUnavailable(), request.verbose(), listener)),
() -> getMultipleReposSnapshotInfo(snapshotsInProgress, response.repositories(),
request.snapshots(), request.ignoreUnavailable(), request.verbose(), listener)),
listener::onFailure),
GetRepositoriesResponse::new));
}

private void getMultipleReposSnapshotInfo(ClusterState state, List<RepositoryMetaData> repos, String[] snapshots,
private void getMultipleReposSnapshotInfo(SnapshotsInProgress snapshotsInProgress, List<RepositoryMetaData> repos, String[] snapshots,
boolean ignoreUnavailable, boolean verbose, ActionListener<GetSnapshotsResponse> listener) {
// short-circuit if there are no repos, because we can not create GroupedActionListener of size 0
if (repos.isEmpty()) {
Expand All @@ -133,16 +135,16 @@ private void getMultipleReposSnapshotInfo(ClusterState state, List<RepositoryMet
} else {
groupedListener.onFailure(e);
}
}), wrappedListener -> getSingleRepoSnapshotInfo(state, repoName, snapshots, ignoreUnavailable, verbose,
}), wrappedListener -> getSingleRepoSnapshotInfo(snapshotsInProgress, repoName, snapshots, ignoreUnavailable, verbose,
ActionListener.map(wrappedListener, snInfos -> GetSnapshotsResponse.Response.snapshots(repoName, snInfos)))));
}
}

private void getSingleRepoSnapshotInfo(ClusterState state, String repo, String[] snapshots, boolean ignoreUnavailable, boolean verbose,
ActionListener<List<SnapshotInfo>> listener) {
private void getSingleRepoSnapshotInfo(SnapshotsInProgress snapshotsInProgress, String repo, String[] snapshots,
boolean ignoreUnavailable, boolean verbose, ActionListener<List<SnapshotInfo>> listener) {
final Map<String, SnapshotId> allSnapshotIds = new HashMap<>();
final List<SnapshotInfo> currentSnapshots = new ArrayList<>();
for (SnapshotInfo snapshotInfo : SnapshotsService.currentSnapshots(state, repo)) {
for (SnapshotInfo snapshotInfo : SnapshotsService.currentSnapshots(snapshotsInProgress, repo)) {
SnapshotId snapshotId = snapshotInfo.snapshotId();
allSnapshotIds.put(snapshotId.getName(), snapshotId);
currentSnapshots.add(snapshotInfo);
Expand All @@ -155,13 +157,13 @@ private void getSingleRepoSnapshotInfo(ClusterState state, String repo, String[]
snapshotsService.getRepositoryData(repo, repositoryDataListener);
}

repositoryDataListener.whenComplete(repositoryData -> listener.onResponse(
loadSnapshotInfos(state, repo, snapshots, ignoreUnavailable, verbose, allSnapshotIds, currentSnapshots, repositoryData)),
repositoryDataListener.whenComplete(repositoryData -> listener.onResponse(loadSnapshotInfos(snapshotsInProgress, repo, snapshots,
ignoreUnavailable, verbose, allSnapshotIds, currentSnapshots, repositoryData)),
listener::onFailure);
}

private List<SnapshotInfo> loadSnapshotInfos(ClusterState state, String repo, String[] snapshots, boolean ignoreUnavailable,
boolean verbose, Map<String, SnapshotId> allSnapshotIds,
private List<SnapshotInfo> loadSnapshotInfos(SnapshotsInProgress snapshotsInProgress, String repo, String[] snapshots,
boolean ignoreUnavailable, boolean verbose, Map<String, SnapshotId> allSnapshotIds,
List<SnapshotInfo> currentSnapshots, @Nullable RepositoryData repositoryData) {
if (repositoryData != null) {
for (SnapshotId snapshotId : repositoryData.getSnapshotIds()) {
Expand Down Expand Up @@ -198,7 +200,7 @@ private List<SnapshotInfo> loadSnapshotInfos(ClusterState state, String repo, St

final List<SnapshotInfo> snapshotInfos;
if (verbose) {
snapshotInfos = snapshotsService.snapshots(state, repo, new ArrayList<>(toResolve), ignoreUnavailable);
snapshotInfos = snapshotsService.snapshots(snapshotsInProgress, repo, new ArrayList<>(toResolve), ignoreUnavailable);
} else {
if (repositoryData != null) {
// want non-current snapshots as well, which are found in the repository data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,11 @@ protected SnapshotsStatusResponse read(StreamInput in) throws IOException {
protected void masterOperation(Task task, final SnapshotsStatusRequest request,
final ClusterState state,
final ActionListener<SnapshotsStatusResponse> listener) throws Exception {
final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE);
List<SnapshotsInProgress.Entry> currentSnapshots =
SnapshotsService.currentSnapshots(state, request.repository(), Arrays.asList(request.snapshots()));
SnapshotsService.currentSnapshots(snapshotsInProgress, request.repository(), Arrays.asList(request.snapshots()));
if (currentSnapshots.isEmpty()) {
buildResponse(state, request, currentSnapshots, null, listener);
buildResponse(snapshotsInProgress, request, currentSnapshots, null, listener);
return;
}

Expand All @@ -127,16 +128,17 @@ protected void masterOperation(Task task, final SnapshotsStatusRequest request,
.snapshots(snapshots).timeout(request.masterNodeTimeout()),
ActionListener.wrap(nodeSnapshotStatuses -> threadPool.generic().execute(
ActionRunnable.wrap(listener,
l -> buildResponse(state, request, currentSnapshots, nodeSnapshotStatuses, l))
l -> buildResponse(snapshotsInProgress, request, currentSnapshots, nodeSnapshotStatuses, l))
), listener::onFailure));
} else {
// We don't have any in-progress shards, just return current stats
buildResponse(state, request, currentSnapshots, null, listener);
buildResponse(snapshotsInProgress, request, currentSnapshots, null, listener);
}

}

private void buildResponse(ClusterState state, SnapshotsStatusRequest request, List<SnapshotsInProgress.Entry> currentSnapshotEntries,
private void buildResponse(SnapshotsInProgress snapshotsInProgress, SnapshotsStatusRequest request,
List<SnapshotsInProgress.Entry> currentSnapshotEntries,
TransportNodesSnapshotsStatus.NodesSnapshotStatus nodeSnapshotStatuses,
ActionListener<SnapshotsStatusResponse> listener) {
// First process snapshot that are currently processed
Expand Down Expand Up @@ -198,13 +200,13 @@ private void buildResponse(ClusterState state, SnapshotsStatusRequest request, L
// Now add snapshots on disk that are not currently running
final String repositoryName = request.repository();
if (Strings.hasText(repositoryName) && request.snapshots() != null && request.snapshots().length > 0) {
loadRepositoryData(state, request, builder, currentSnapshotNames, repositoryName, listener);
loadRepositoryData(snapshotsInProgress, request, builder, currentSnapshotNames, repositoryName, listener);
} else {
listener.onResponse(new SnapshotsStatusResponse(Collections.unmodifiableList(builder)));
}
}

private void loadRepositoryData(ClusterState clusterState, SnapshotsStatusRequest request, List<SnapshotStatus> builder,
private void loadRepositoryData(SnapshotsInProgress snapshotsInProgress, SnapshotsStatusRequest request, List<SnapshotStatus> builder,
Set<String> currentSnapshotNames, String repositoryName,
ActionListener<SnapshotsStatusResponse> listener) {
final Set<String> requestedSnapshotNames = Sets.newHashSet(request.snapshots());
Expand All @@ -231,7 +233,7 @@ private void loadRepositoryData(ClusterState clusterState, SnapshotsStatusReques
throw new SnapshotMissingException(repositoryName, snapshotName);
}
}
SnapshotInfo snapshotInfo = snapshotsService.snapshot(clusterState, repositoryName, snapshotId);
SnapshotInfo snapshotInfo = snapshotsService.snapshot(snapshotsInProgress, repositoryName, snapshotId);
List<SnapshotIndexShardStatus> shardStatusBuilder = new ArrayList<>();
if (snapshotInfo.state().completed()) {
Map<ShardId, IndexShardSnapshotStatus> shardStatuses =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,14 +174,15 @@ public void getRepositoryData(final String repositoryName, final ActionListener<
/**
* Retrieves snapshot from repository
*
* @param state cluster state
* @param repositoryName repository name
* @param snapshotId snapshot id
* @param snapshotsInProgress snapshots in progress in the cluster state
* @param repositoryName repository name
* @param snapshotId snapshot id
* @return snapshot
* @throws SnapshotMissingException if snapshot is not found
*/
public SnapshotInfo snapshot(ClusterState state, final String repositoryName, final SnapshotId snapshotId) {
List<SnapshotsInProgress.Entry> entries = currentSnapshots(state, repositoryName, Collections.singletonList(snapshotId.getName()));
public SnapshotInfo snapshot(SnapshotsInProgress snapshotsInProgress, final String repositoryName, final SnapshotId snapshotId) {
List<SnapshotsInProgress.Entry> entries =
currentSnapshots(snapshotsInProgress, repositoryName, Collections.singletonList(snapshotId.getName()));
if (!entries.isEmpty()) {
return inProgressSnapshot(entries.iterator().next());
}
Expand All @@ -191,20 +192,20 @@ public SnapshotInfo snapshot(ClusterState state, final String repositoryName, fi
/**
* Returns a list of snapshots from repository sorted by snapshot creation date
*
* @param state cluster state
* @param repositoryName repository name
* @param snapshotIds snapshots for which to fetch snapshot information
* @param ignoreUnavailable if true, snapshots that could not be read will only be logged with a warning,
* if false, they will throw an error
* @param snapshotsInProgress snapshots in progress in the cluster state
* @param repositoryName repository name
* @param snapshotIds snapshots for which to fetch snapshot information
* @param ignoreUnavailable if true, snapshots that could not be read will only be logged with a warning,
* if false, they will throw an error
* @return list of snapshots
*/
public List<SnapshotInfo> snapshots(ClusterState state, String repositoryName, List<SnapshotId> snapshotIds,
public List<SnapshotInfo> snapshots(SnapshotsInProgress snapshotsInProgress, String repositoryName, List<SnapshotId> snapshotIds,
boolean ignoreUnavailable) {
final Set<SnapshotInfo> snapshotSet = new HashSet<>();
final Set<SnapshotId> snapshotIdsToIterate = new HashSet<>(snapshotIds);
// first, look at the snapshots in progress
final List<SnapshotsInProgress.Entry> entries =
currentSnapshots(state, repositoryName, snapshotIdsToIterate.stream().map(SnapshotId::getName).collect(Collectors.toList()));
final List<SnapshotsInProgress.Entry> entries = currentSnapshots(
snapshotsInProgress, repositoryName, snapshotIdsToIterate.stream().map(SnapshotId::getName).collect(Collectors.toList()));
for (SnapshotsInProgress.Entry entry : entries) {
snapshotSet.add(inProgressSnapshot(entry));
snapshotIdsToIterate.remove(entry.snapshot().getSnapshotId());
Expand Down Expand Up @@ -233,13 +234,13 @@ public List<SnapshotInfo> snapshots(ClusterState state, String repositoryName, L
/**
* Returns a list of currently running snapshots from repository sorted by snapshot creation date
*
* @param state cluster state
* @param snapshotsInProgress snapshots in progress in the cluster state
* @param repositoryName repository name
* @return list of snapshots
*/
public static List<SnapshotInfo> currentSnapshots(ClusterState state, String repositoryName) {
public static List<SnapshotInfo> currentSnapshots(SnapshotsInProgress snapshotsInProgress, String repositoryName) {
List<SnapshotInfo> snapshotList = new ArrayList<>();
List<SnapshotsInProgress.Entry> entries = currentSnapshots(state, repositoryName, Collections.emptyList());
List<SnapshotsInProgress.Entry> entries = currentSnapshots(snapshotsInProgress, repositoryName, Collections.emptyList());
for (SnapshotsInProgress.Entry entry : entries) {
snapshotList.add(inProgressSnapshot(entry));
}
Expand Down Expand Up @@ -677,12 +678,13 @@ private static SnapshotInfo inProgressSnapshot(SnapshotsInProgress.Entry entry)
* This method is executed on master node
* </p>
*
* @param repository repository id
* @param snapshots list of snapshots that will be used as a filter, empty list means no snapshots are filtered
* @param snapshotsInProgress snapshots in progress in the cluster state
* @param repository repository id
* @param snapshots list of snapshots that will be used as a filter, empty list means no snapshots are filtered
* @return list of metadata for currently running snapshots
*/
public static List<SnapshotsInProgress.Entry> currentSnapshots(ClusterState state, String repository, List<String> snapshots) {
SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE);
public static List<SnapshotsInProgress.Entry> currentSnapshots(SnapshotsInProgress snapshotsInProgress, String repository,
List<String> snapshots) {
if (snapshotsInProgress == null || snapshotsInProgress.entries().isEmpty()) {
return Collections.emptyList();
}
Expand Down Expand Up @@ -1227,7 +1229,7 @@ public void deleteSnapshot(final String repositoryName, final String snapshotNam
long repoGenId = repositoryData.getGenId();
if (matchedEntry.isPresent() == false) {
Optional<SnapshotsInProgress.Entry> matchedInProgress = currentSnapshots(
clusterService.state(), repositoryName, Collections.emptyList()).stream()
clusterService.state().custom(SnapshotsInProgress.TYPE), repositoryName, Collections.emptyList()).stream()
.filter(s -> s.snapshot().getSnapshotId().getName().equals(snapshotName)).findFirst();
if (matchedInProgress.isPresent()) {
matchedEntry = matchedInProgress.map(s -> s.snapshot().getSnapshotId());
Expand Down

0 comments on commit f2efe16

Please sign in to comment.