From 1e5f77eb1942e77f09d901961f73184ad1294730 Mon Sep 17 00:00:00 2001
From: Armin Braun
Date: Mon, 27 Jan 2020 11:58:51 +0100
Subject: [PATCH] Use Consistent ClusterState throughout Snapshot API Calls
(#51464)
We shouldn't be using potentially changing versions of the cluster state
when answering a snapshot status API call by calling `SnapshotService#currentSnapshots` multiple times (each time using `ClusterService#state` under the hood) but instead pass down the state from the transport action.
Having these API behave more in a more deterministic way will make it easier to use them once parallel repository operations
are introduced.
---
.../get/TransportGetSnapshotsAction.java | 31 +++++++------
.../TransportSnapshotsStatusAction.java | 24 ++++++-----
.../snapshots/SnapshotsService.java | 43 +++++++++++--------
3 files changed, 55 insertions(+), 43 deletions(-)
diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java
index a488ed413897e..09508ace30002 100644
--- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java
+++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java
@@ -32,6 +32,7 @@
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@@ -96,6 +97,7 @@ protected ClusterBlockException checkBlock(GetSnapshotsRequest request, ClusterS
protected void masterOperation(Task task, final GetSnapshotsRequest request, final ClusterState state,
final ActionListener listener) {
final String[] repositories = request.repositories();
+ final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE);
transportService.sendChildRequest(transportService.getLocalNode(), GetRepositoriesAction.NAME,
new GetRepositoriesRequest(repositories), task, TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(
@@ -103,14 +105,15 @@ protected void masterOperation(Task task, final GetSnapshotsRequest request, fin
response ->
// switch to GENERIC thread pool because it might be long running operation
threadPool.executor(ThreadPool.Names.GENERIC).execute(
- () -> getMultipleReposSnapshotInfo(response.repositories(), request.snapshots(),
- request.ignoreUnavailable(), request.verbose(), listener)),
+ () -> getMultipleReposSnapshotInfo(snapshotsInProgress, response.repositories(),
+ request.snapshots(), request.ignoreUnavailable(), request.verbose(), listener)),
listener::onFailure),
GetRepositoriesResponse::new));
}
- private void getMultipleReposSnapshotInfo(List repos, String[] snapshots, boolean ignoreUnavailable,
- boolean verbose, ActionListener listener) {
+ private void getMultipleReposSnapshotInfo(@Nullable SnapshotsInProgress snapshotsInProgress, List repos,
+ String[] snapshots, boolean ignoreUnavailable, boolean verbose,
+ ActionListener listener) {
// short-circuit if there are no repos, because we can not create GroupedActionListener of size 0
if (repos.isEmpty()) {
listener.onResponse(new GetSnapshotsResponse(Collections.emptyList()));
@@ -133,16 +136,16 @@ private void getMultipleReposSnapshotInfo(List repos, String
} else {
groupedListener.onFailure(e);
}
- }), wrappedListener -> getSingleRepoSnapshotInfo(repoName, snapshots, ignoreUnavailable, verbose,
+ }), wrappedListener -> getSingleRepoSnapshotInfo(snapshotsInProgress, repoName, snapshots, ignoreUnavailable, verbose,
ActionListener.map(wrappedListener, snInfos -> GetSnapshotsResponse.Response.snapshots(repoName, snInfos)))));
}
}
- private void getSingleRepoSnapshotInfo(String repo, String[] snapshots, boolean ignoreUnavailable, boolean verbose,
- ActionListener> listener) {
+ private void getSingleRepoSnapshotInfo(@Nullable SnapshotsInProgress snapshotsInProgress, String repo, String[] snapshots,
+ boolean ignoreUnavailable, boolean verbose, ActionListener> listener) {
final Map allSnapshotIds = new HashMap<>();
final List currentSnapshots = new ArrayList<>();
- for (SnapshotInfo snapshotInfo : snapshotsService.currentSnapshots(repo)) {
+ for (SnapshotInfo snapshotInfo : SnapshotsService.currentSnapshots(snapshotsInProgress, repo)) {
SnapshotId snapshotId = snapshotInfo.snapshotId();
allSnapshotIds.put(snapshotId.getName(), snapshotId);
currentSnapshots.add(snapshotInfo);
@@ -155,14 +158,14 @@ private void getSingleRepoSnapshotInfo(String repo, String[] snapshots, boolean
snapshotsService.getRepositoryData(repo, repositoryDataListener);
}
- repositoryDataListener.whenComplete(repositoryData -> listener.onResponse(
- loadSnapshotInfos(repo, snapshots, ignoreUnavailable, verbose, allSnapshotIds, currentSnapshots, repositoryData)),
+ repositoryDataListener.whenComplete(repositoryData -> listener.onResponse(loadSnapshotInfos(snapshotsInProgress, repo, snapshots,
+ ignoreUnavailable, verbose, allSnapshotIds, currentSnapshots, repositoryData)),
listener::onFailure);
}
- private List loadSnapshotInfos(String repo, String[] snapshots, boolean ignoreUnavailable, boolean verbose,
- Map allSnapshotIds, List currentSnapshots,
- @Nullable RepositoryData repositoryData) {
+ private List loadSnapshotInfos(@Nullable SnapshotsInProgress snapshotsInProgress, String repo, String[] snapshots,
+ boolean ignoreUnavailable, boolean verbose, Map allSnapshotIds,
+ List currentSnapshots, @Nullable RepositoryData repositoryData) {
if (repositoryData != null) {
for (SnapshotId snapshotId : repositoryData.getSnapshotIds()) {
allSnapshotIds.put(snapshotId.getName(), snapshotId);
@@ -198,7 +201,7 @@ private List loadSnapshotInfos(String repo, String[] snapshots, bo
final List snapshotInfos;
if (verbose) {
- snapshotInfos = snapshotsService.snapshots(repo, new ArrayList<>(toResolve), ignoreUnavailable);
+ snapshotInfos = snapshotsService.snapshots(snapshotsInProgress, repo, new ArrayList<>(toResolve), ignoreUnavailable);
} else {
if (repositoryData != null) {
// want non-current snapshots as well, which are found in the repository data
diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java
index 8e7c85760749f..e50036e1e4b0b 100644
--- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java
+++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java
@@ -35,6 +35,7 @@
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
@@ -100,10 +101,11 @@ protected SnapshotsStatusResponse read(StreamInput in) throws IOException {
protected void masterOperation(Task task, final SnapshotsStatusRequest request,
final ClusterState state,
final ActionListener listener) throws Exception {
+ final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE);
List currentSnapshots =
- snapshotsService.currentSnapshots(request.repository(), Arrays.asList(request.snapshots()));
+ SnapshotsService.currentSnapshots(snapshotsInProgress, request.repository(), Arrays.asList(request.snapshots()));
if (currentSnapshots.isEmpty()) {
- buildResponse(request, currentSnapshots, null, listener);
+ buildResponse(snapshotsInProgress, request, currentSnapshots, null, listener);
return;
}
@@ -127,18 +129,17 @@ protected void masterOperation(Task task, final SnapshotsStatusRequest request,
.snapshots(snapshots).timeout(request.masterNodeTimeout()),
ActionListener.wrap(nodeSnapshotStatuses -> threadPool.generic().execute(
ActionRunnable.wrap(listener,
- l -> buildResponse(
- request, snapshotsService.currentSnapshots(request.repository(), Arrays.asList(request.snapshots())),
- nodeSnapshotStatuses, l))
+ l -> buildResponse(snapshotsInProgress, request, currentSnapshots, nodeSnapshotStatuses, l))
), listener::onFailure));
} else {
// We don't have any in-progress shards, just return current stats
- buildResponse(request, currentSnapshots, null, listener);
+ buildResponse(snapshotsInProgress, request, currentSnapshots, null, listener);
}
}
- private void buildResponse(SnapshotsStatusRequest request, List currentSnapshotEntries,
+ private void buildResponse(@Nullable SnapshotsInProgress snapshotsInProgress, SnapshotsStatusRequest request,
+ List currentSnapshotEntries,
TransportNodesSnapshotsStatus.NodesSnapshotStatus nodeSnapshotStatuses,
ActionListener listener) {
// First process snapshot that are currently processed
@@ -200,14 +201,15 @@ private void buildResponse(SnapshotsStatusRequest request, List 0) {
- loadRepositoryData(request, builder, currentSnapshotNames, repositoryName, listener);
+ loadRepositoryData(snapshotsInProgress, request, builder, currentSnapshotNames, repositoryName, listener);
} else {
listener.onResponse(new SnapshotsStatusResponse(Collections.unmodifiableList(builder)));
}
}
- private void loadRepositoryData(SnapshotsStatusRequest request, List builder, Set currentSnapshotNames,
- String repositoryName, ActionListener listener) {
+ private void loadRepositoryData(@Nullable SnapshotsInProgress snapshotsInProgress, SnapshotsStatusRequest request,
+ List builder, Set currentSnapshotNames, String repositoryName,
+ ActionListener listener) {
final Set requestedSnapshotNames = Sets.newHashSet(request.snapshots());
final StepListener repositoryDataListener = new StepListener<>();
snapshotsService.getRepositoryData(repositoryName, repositoryDataListener);
@@ -232,7 +234,7 @@ private void loadRepositoryData(SnapshotsStatusRequest request, List shardStatusBuilder = new ArrayList<>();
if (snapshotInfo.state().completed()) {
Map shardStatuses =
diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java
index 4a1b6e84b639a..abb0f04ea0101 100644
--- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java
+++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java
@@ -174,13 +174,15 @@ public void getRepositoryData(final String repositoryName, final ActionListener<
/**
* Retrieves snapshot from repository
*
- * @param repositoryName repository name
- * @param snapshotId snapshot id
+ * @param snapshotsInProgress snapshots in progress in the cluster state
+ * @param repositoryName repository name
+ * @param snapshotId snapshot id
* @return snapshot
* @throws SnapshotMissingException if snapshot is not found
*/
- public SnapshotInfo snapshot(final String repositoryName, final SnapshotId snapshotId) {
- List entries = currentSnapshots(repositoryName, Collections.singletonList(snapshotId.getName()));
+ public SnapshotInfo snapshot(@Nullable SnapshotsInProgress snapshotsInProgress, String repositoryName, SnapshotId snapshotId) {
+ List entries =
+ currentSnapshots(snapshotsInProgress, repositoryName, Collections.singletonList(snapshotId.getName()));
if (!entries.isEmpty()) {
return inProgressSnapshot(entries.iterator().next());
}
@@ -190,18 +192,20 @@ public SnapshotInfo snapshot(final String repositoryName, final SnapshotId snaps
/**
* Returns a list of snapshots from repository sorted by snapshot creation date
*
- * @param repositoryName repository name
- * @param snapshotIds snapshots for which to fetch snapshot information
- * @param ignoreUnavailable if true, snapshots that could not be read will only be logged with a warning,
- * if false, they will throw an error
+ * @param snapshotsInProgress snapshots in progress in the cluster state
+ * @param repositoryName repository name
+ * @param snapshotIds snapshots for which to fetch snapshot information
+ * @param ignoreUnavailable if true, snapshots that could not be read will only be logged with a warning,
+ * if false, they will throw an error
* @return list of snapshots
*/
- public List snapshots(final String repositoryName, final List snapshotIds, final boolean ignoreUnavailable) {
+ public List snapshots(@Nullable SnapshotsInProgress snapshotsInProgress, String repositoryName,
+ List snapshotIds, boolean ignoreUnavailable) {
final Set snapshotSet = new HashSet<>();
final Set snapshotIdsToIterate = new HashSet<>(snapshotIds);
// first, look at the snapshots in progress
- final List entries =
- currentSnapshots(repositoryName, snapshotIdsToIterate.stream().map(SnapshotId::getName).collect(Collectors.toList()));
+ final List entries = currentSnapshots(
+ snapshotsInProgress, repositoryName, snapshotIdsToIterate.stream().map(SnapshotId::getName).collect(Collectors.toList()));
for (SnapshotsInProgress.Entry entry : entries) {
snapshotSet.add(inProgressSnapshot(entry));
snapshotIdsToIterate.remove(entry.snapshot().getSnapshotId());
@@ -230,12 +234,13 @@ public List snapshots(final String repositoryName, final List currentSnapshots(final String repositoryName) {
+ public static List currentSnapshots(@Nullable SnapshotsInProgress snapshotsInProgress, String repositoryName) {
List snapshotList = new ArrayList<>();
- List entries = currentSnapshots(repositoryName, Collections.emptyList());
+ List entries = currentSnapshots(snapshotsInProgress, repositoryName, Collections.emptyList());
for (SnapshotsInProgress.Entry entry : entries) {
snapshotList.add(inProgressSnapshot(entry));
}
@@ -673,12 +678,13 @@ private static SnapshotInfo inProgressSnapshot(SnapshotsInProgress.Entry entry)
* This method is executed on master node
*
*
- * @param repository repository id
- * @param snapshots list of snapshots that will be used as a filter, empty list means no snapshots are filtered
+ * @param snapshotsInProgress snapshots in progress in the cluster state
+ * @param repository repository id
+ * @param snapshots list of snapshots that will be used as a filter, empty list means no snapshots are filtered
* @return list of metadata for currently running snapshots
*/
- public List currentSnapshots(final String repository, final List snapshots) {
- SnapshotsInProgress snapshotsInProgress = clusterService.state().custom(SnapshotsInProgress.TYPE);
+ public static List currentSnapshots(@Nullable SnapshotsInProgress snapshotsInProgress, String repository,
+ List snapshots) {
if (snapshotsInProgress == null || snapshotsInProgress.entries().isEmpty()) {
return Collections.emptyList();
}
@@ -1222,7 +1228,8 @@ public void deleteSnapshot(final String repositoryName, final String snapshotNam
// if nothing found by the same name, then look in the cluster state for current in progress snapshots
long repoGenId = repositoryData.getGenId();
if (matchedEntry.isPresent() == false) {
- Optional matchedInProgress = currentSnapshots(repositoryName, Collections.emptyList()).stream()
+ Optional matchedInProgress = currentSnapshots(
+ clusterService.state().custom(SnapshotsInProgress.TYPE), repositoryName, Collections.emptyList()).stream()
.filter(s -> s.snapshot().getSnapshotId().getName().equals(snapshotName)).findFirst();
if (matchedInProgress.isPresent()) {
matchedEntry = matchedInProgress.map(s -> s.snapshot().getSnapshotId());