diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotStatusApisIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotStatusApisIT.java index abdffd6eed93a..068298d70e927 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotStatusApisIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SnapshotStatusApisIT.java @@ -51,11 +51,13 @@ import java.util.stream.Collectors; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; public class SnapshotStatusApisIT extends AbstractSnapshotIntegTestCase { @@ -514,6 +516,38 @@ public void testGetSnapshotsRequest() throws Exception { awaitNoMoreRunningOperations(); } + public void testConcurrentCreateAndStatusAPICalls() throws Exception { + for (int i = 0; i < randomIntBetween(1, 10); i++) { + createIndexWithContent("test-idx-" + i); + } + final String repoName = "test-repo"; + createRepository(repoName, "fs"); + final int snapshots = randomIntBetween(10, 20); + final List> statuses = new ArrayList<>(snapshots); + final List> gets = new ArrayList<>(snapshots); + final Client dataNodeClient = dataNodeClient(); + final String[] snapshotNames = createNSnapshots(repoName, snapshots).toArray(Strings.EMPTY_ARRAY); + + for (int i = 0; i < snapshots; i++) { + statuses.add(dataNodeClient.admin().cluster().prepareSnapshotStatus(repoName).setSnapshots(snapshotNames).execute()); + gets.add(dataNodeClient.admin().cluster().prepareGetSnapshots(repoName).setSnapshots(snapshotNames).execute()); + } + + for (ActionFuture status : statuses) { + assertThat(status.get().getSnapshots(), hasSize(snapshots)); + for (SnapshotStatus snapshot : status.get().getSnapshots()) { + assertThat(snapshot.getState(), allOf(not(SnapshotsInProgress.State.FAILED), not(SnapshotsInProgress.State.ABORTED))); + } + } + for (ActionFuture get : gets) { + final List snapshotInfos = get.get().getSnapshots(repoName); + assertThat(snapshotInfos, hasSize(snapshots)); + for (SnapshotInfo snapshotInfo : snapshotInfos) { + assertThat(snapshotInfo.state(), is(SnapshotState.SUCCESS)); + } + } + } + private static SnapshotIndexShardStatus stateFirstShard(SnapshotStatus snapshotStatus, String indexName) { return snapshotStatus.getIndices().get(indexName).getShards().get(0); } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java index 1f3d97f33fd97..b357dcad45552 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java @@ -25,7 +25,6 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; -import org.elasticsearch.action.StepListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.node.NodeClient; @@ -39,6 +38,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; @@ -238,9 +238,9 @@ private void loadRepositoryData(SnapshotsInProgress snapshotsInProgress, Snapsho List builder, Set currentSnapshotNames, String repositoryName, ActionListener listener) { final Set requestedSnapshotNames = Sets.newHashSet(request.snapshots()); - final StepListener repositoryDataListener = new StepListener<>(); + final ListenableFuture repositoryDataListener = new ListenableFuture<>(); repositoriesService.getRepositoryData(repositoryName, repositoryDataListener); - repositoryDataListener.whenComplete(repositoryData -> { + repositoryDataListener.addListener(ActionListener.wrap(repositoryData -> { final Map matchedSnapshotIds = repositoryData.getSnapshotIds().stream() .filter(s -> requestedSnapshotNames.contains(s.getName())) .collect(Collectors.toMap(SnapshotId::getName, Function.identity())); @@ -295,7 +295,7 @@ private void loadRepositoryData(SnapshotsInProgress snapshotsInProgress, Snapsho } } listener.onResponse(new SnapshotsStatusResponse(Collections.unmodifiableList(builder))); - }, listener::onFailure); + }, listener::onFailure), threadPool.generic()); } /**