From f2efe16f149c787e75fae3fe80042c5c303ee63d Mon Sep 17 00:00:00 2001
From: Armin Braun
Date: Mon, 27 Jan 2020 09:55:24 +0100
Subject: [PATCH] CR: pass around SnapshotsInProgress
---
.../get/TransportGetSnapshotsAction.java | 26 ++++++-----
.../TransportSnapshotsStatusAction.java | 18 ++++----
.../snapshots/SnapshotsService.java | 44 ++++++++++---------
3 files changed, 47 insertions(+), 41 deletions(-)
diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java
index f54a522e41e6..67af3f70fe1d 100644
--- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java
+++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java
@@ -32,6 +32,7 @@
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@@ -96,6 +97,7 @@ protected ClusterBlockException checkBlock(GetSnapshotsRequest request, ClusterS
protected void masterOperation(Task task, final GetSnapshotsRequest request, final ClusterState state,
final ActionListener listener) {
final String[] repositories = request.repositories();
+ final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE);
transportService.sendChildRequest(transportService.getLocalNode(), GetRepositoriesAction.NAME,
new GetRepositoriesRequest(repositories), task, TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(
@@ -103,13 +105,13 @@ protected void masterOperation(Task task, final GetSnapshotsRequest request, fin
response ->
// switch to GENERIC thread pool because it might be long running operation
threadPool.executor(ThreadPool.Names.GENERIC).execute(
- () -> getMultipleReposSnapshotInfo(state, response.repositories(), request.snapshots(),
- request.ignoreUnavailable(), request.verbose(), listener)),
+ () -> getMultipleReposSnapshotInfo(snapshotsInProgress, response.repositories(),
+ request.snapshots(), request.ignoreUnavailable(), request.verbose(), listener)),
listener::onFailure),
GetRepositoriesResponse::new));
}
- private void getMultipleReposSnapshotInfo(ClusterState state, List repos, String[] snapshots,
+ private void getMultipleReposSnapshotInfo(SnapshotsInProgress snapshotsInProgress, List repos, String[] snapshots,
boolean ignoreUnavailable, boolean verbose, ActionListener listener) {
// short-circuit if there are no repos, because we can not create GroupedActionListener of size 0
if (repos.isEmpty()) {
@@ -133,16 +135,16 @@ private void getMultipleReposSnapshotInfo(ClusterState state, List getSingleRepoSnapshotInfo(state, repoName, snapshots, ignoreUnavailable, verbose,
+ }), wrappedListener -> getSingleRepoSnapshotInfo(snapshotsInProgress, repoName, snapshots, ignoreUnavailable, verbose,
ActionListener.map(wrappedListener, snInfos -> GetSnapshotsResponse.Response.snapshots(repoName, snInfos)))));
}
}
- private void getSingleRepoSnapshotInfo(ClusterState state, String repo, String[] snapshots, boolean ignoreUnavailable, boolean verbose,
- ActionListener> listener) {
+ private void getSingleRepoSnapshotInfo(SnapshotsInProgress snapshotsInProgress, String repo, String[] snapshots,
+ boolean ignoreUnavailable, boolean verbose, ActionListener> listener) {
final Map allSnapshotIds = new HashMap<>();
final List currentSnapshots = new ArrayList<>();
- for (SnapshotInfo snapshotInfo : SnapshotsService.currentSnapshots(state, repo)) {
+ for (SnapshotInfo snapshotInfo : SnapshotsService.currentSnapshots(snapshotsInProgress, repo)) {
SnapshotId snapshotId = snapshotInfo.snapshotId();
allSnapshotIds.put(snapshotId.getName(), snapshotId);
currentSnapshots.add(snapshotInfo);
@@ -155,13 +157,13 @@ private void getSingleRepoSnapshotInfo(ClusterState state, String repo, String[]
snapshotsService.getRepositoryData(repo, repositoryDataListener);
}
- repositoryDataListener.whenComplete(repositoryData -> listener.onResponse(
- loadSnapshotInfos(state, repo, snapshots, ignoreUnavailable, verbose, allSnapshotIds, currentSnapshots, repositoryData)),
+ repositoryDataListener.whenComplete(repositoryData -> listener.onResponse(loadSnapshotInfos(snapshotsInProgress, repo, snapshots,
+ ignoreUnavailable, verbose, allSnapshotIds, currentSnapshots, repositoryData)),
listener::onFailure);
}
- private List loadSnapshotInfos(ClusterState state, String repo, String[] snapshots, boolean ignoreUnavailable,
- boolean verbose, Map allSnapshotIds,
+ private List loadSnapshotInfos(SnapshotsInProgress snapshotsInProgress, String repo, String[] snapshots,
+ boolean ignoreUnavailable, boolean verbose, Map allSnapshotIds,
List currentSnapshots, @Nullable RepositoryData repositoryData) {
if (repositoryData != null) {
for (SnapshotId snapshotId : repositoryData.getSnapshotIds()) {
@@ -198,7 +200,7 @@ private List loadSnapshotInfos(ClusterState state, String repo, St
final List snapshotInfos;
if (verbose) {
- snapshotInfos = snapshotsService.snapshots(state, repo, new ArrayList<>(toResolve), ignoreUnavailable);
+ snapshotInfos = snapshotsService.snapshots(snapshotsInProgress, repo, new ArrayList<>(toResolve), ignoreUnavailable);
} else {
if (repositoryData != null) {
// want non-current snapshots as well, which are found in the repository data
diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java
index 573e7a9d20e3..3c2031729645 100644
--- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java
+++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java
@@ -100,10 +100,11 @@ protected SnapshotsStatusResponse read(StreamInput in) throws IOException {
protected void masterOperation(Task task, final SnapshotsStatusRequest request,
final ClusterState state,
final ActionListener listener) throws Exception {
+ final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE);
List currentSnapshots =
- SnapshotsService.currentSnapshots(state, request.repository(), Arrays.asList(request.snapshots()));
+ SnapshotsService.currentSnapshots(snapshotsInProgress, request.repository(), Arrays.asList(request.snapshots()));
if (currentSnapshots.isEmpty()) {
- buildResponse(state, request, currentSnapshots, null, listener);
+ buildResponse(snapshotsInProgress, request, currentSnapshots, null, listener);
return;
}
@@ -127,16 +128,17 @@ protected void masterOperation(Task task, final SnapshotsStatusRequest request,
.snapshots(snapshots).timeout(request.masterNodeTimeout()),
ActionListener.wrap(nodeSnapshotStatuses -> threadPool.generic().execute(
ActionRunnable.wrap(listener,
- l -> buildResponse(state, request, currentSnapshots, nodeSnapshotStatuses, l))
+ l -> buildResponse(snapshotsInProgress, request, currentSnapshots, nodeSnapshotStatuses, l))
), listener::onFailure));
} else {
// We don't have any in-progress shards, just return current stats
- buildResponse(state, request, currentSnapshots, null, listener);
+ buildResponse(snapshotsInProgress, request, currentSnapshots, null, listener);
}
}
- private void buildResponse(ClusterState state, SnapshotsStatusRequest request, List currentSnapshotEntries,
+ private void buildResponse(SnapshotsInProgress snapshotsInProgress, SnapshotsStatusRequest request,
+ List currentSnapshotEntries,
TransportNodesSnapshotsStatus.NodesSnapshotStatus nodeSnapshotStatuses,
ActionListener listener) {
// First process snapshot that are currently processed
@@ -198,13 +200,13 @@ private void buildResponse(ClusterState state, SnapshotsStatusRequest request, L
// Now add snapshots on disk that are not currently running
final String repositoryName = request.repository();
if (Strings.hasText(repositoryName) && request.snapshots() != null && request.snapshots().length > 0) {
- loadRepositoryData(state, request, builder, currentSnapshotNames, repositoryName, listener);
+ loadRepositoryData(snapshotsInProgress, request, builder, currentSnapshotNames, repositoryName, listener);
} else {
listener.onResponse(new SnapshotsStatusResponse(Collections.unmodifiableList(builder)));
}
}
- private void loadRepositoryData(ClusterState clusterState, SnapshotsStatusRequest request, List builder,
+ private void loadRepositoryData(SnapshotsInProgress snapshotsInProgress, SnapshotsStatusRequest request, List builder,
Set currentSnapshotNames, String repositoryName,
ActionListener listener) {
final Set requestedSnapshotNames = Sets.newHashSet(request.snapshots());
@@ -231,7 +233,7 @@ private void loadRepositoryData(ClusterState clusterState, SnapshotsStatusReques
throw new SnapshotMissingException(repositoryName, snapshotName);
}
}
- SnapshotInfo snapshotInfo = snapshotsService.snapshot(clusterState, repositoryName, snapshotId);
+ SnapshotInfo snapshotInfo = snapshotsService.snapshot(snapshotsInProgress, repositoryName, snapshotId);
List shardStatusBuilder = new ArrayList<>();
if (snapshotInfo.state().completed()) {
Map shardStatuses =
diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java
index ebc25b352697..a9f32caa9e66 100644
--- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java
+++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java
@@ -174,14 +174,15 @@ public void getRepositoryData(final String repositoryName, final ActionListener<
/**
* Retrieves snapshot from repository
*
- * @param state cluster state
- * @param repositoryName repository name
- * @param snapshotId snapshot id
+ * @param snapshotsInProgress snapshots in progress in the cluster state
+ * @param repositoryName repository name
+ * @param snapshotId snapshot id
* @return snapshot
* @throws SnapshotMissingException if snapshot is not found
*/
- public SnapshotInfo snapshot(ClusterState state, final String repositoryName, final SnapshotId snapshotId) {
- List entries = currentSnapshots(state, repositoryName, Collections.singletonList(snapshotId.getName()));
+ public SnapshotInfo snapshot(SnapshotsInProgress snapshotsInProgress, final String repositoryName, final SnapshotId snapshotId) {
+ List entries =
+ currentSnapshots(snapshotsInProgress, repositoryName, Collections.singletonList(snapshotId.getName()));
if (!entries.isEmpty()) {
return inProgressSnapshot(entries.iterator().next());
}
@@ -191,20 +192,20 @@ public SnapshotInfo snapshot(ClusterState state, final String repositoryName, fi
/**
* Returns a list of snapshots from repository sorted by snapshot creation date
*
- * @param state cluster state
- * @param repositoryName repository name
- * @param snapshotIds snapshots for which to fetch snapshot information
- * @param ignoreUnavailable if true, snapshots that could not be read will only be logged with a warning,
- * if false, they will throw an error
+ * @param snapshotsInProgress snapshots in progress in the cluster state
+ * @param repositoryName repository name
+ * @param snapshotIds snapshots for which to fetch snapshot information
+ * @param ignoreUnavailable if true, snapshots that could not be read will only be logged with a warning,
+ * if false, they will throw an error
* @return list of snapshots
*/
- public List snapshots(ClusterState state, String repositoryName, List snapshotIds,
+ public List snapshots(SnapshotsInProgress snapshotsInProgress, String repositoryName, List snapshotIds,
boolean ignoreUnavailable) {
final Set snapshotSet = new HashSet<>();
final Set snapshotIdsToIterate = new HashSet<>(snapshotIds);
// first, look at the snapshots in progress
- final List entries =
- currentSnapshots(state, repositoryName, snapshotIdsToIterate.stream().map(SnapshotId::getName).collect(Collectors.toList()));
+ final List entries = currentSnapshots(
+ snapshotsInProgress, repositoryName, snapshotIdsToIterate.stream().map(SnapshotId::getName).collect(Collectors.toList()));
for (SnapshotsInProgress.Entry entry : entries) {
snapshotSet.add(inProgressSnapshot(entry));
snapshotIdsToIterate.remove(entry.snapshot().getSnapshotId());
@@ -233,13 +234,13 @@ public List snapshots(ClusterState state, String repositoryName, L
/**
* Returns a list of currently running snapshots from repository sorted by snapshot creation date
*
- * @param state cluster state
+ * @param snapshotsInProgress snapshots in progress in the cluster state
* @param repositoryName repository name
* @return list of snapshots
*/
- public static List currentSnapshots(ClusterState state, String repositoryName) {
+ public static List currentSnapshots(SnapshotsInProgress snapshotsInProgress, String repositoryName) {
List snapshotList = new ArrayList<>();
- List entries = currentSnapshots(state, repositoryName, Collections.emptyList());
+ List entries = currentSnapshots(snapshotsInProgress, repositoryName, Collections.emptyList());
for (SnapshotsInProgress.Entry entry : entries) {
snapshotList.add(inProgressSnapshot(entry));
}
@@ -677,12 +678,13 @@ private static SnapshotInfo inProgressSnapshot(SnapshotsInProgress.Entry entry)
* This method is executed on master node
*
*
- * @param repository repository id
- * @param snapshots list of snapshots that will be used as a filter, empty list means no snapshots are filtered
+ * @param snapshotsInProgress snapshots in progress in the cluster state
+ * @param repository repository id
+ * @param snapshots list of snapshots that will be used as a filter, empty list means no snapshots are filtered
* @return list of metadata for currently running snapshots
*/
- public static List currentSnapshots(ClusterState state, String repository, List snapshots) {
- SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE);
+ public static List currentSnapshots(SnapshotsInProgress snapshotsInProgress, String repository,
+ List snapshots) {
if (snapshotsInProgress == null || snapshotsInProgress.entries().isEmpty()) {
return Collections.emptyList();
}
@@ -1227,7 +1229,7 @@ public void deleteSnapshot(final String repositoryName, final String snapshotNam
long repoGenId = repositoryData.getGenId();
if (matchedEntry.isPresent() == false) {
Optional matchedInProgress = currentSnapshots(
- clusterService.state(), repositoryName, Collections.emptyList()).stream()
+ clusterService.state().custom(SnapshotsInProgress.TYPE), repositoryName, Collections.emptyList()).stream()
.filter(s -> s.snapshot().getSnapshotId().getName().equals(snapshotName)).findFirst();
if (matchedInProgress.isPresent()) {
matchedEntry = matchedInProgress.map(s -> s.snapshot().getSnapshotId());