From 94930fa5fe5c49f6ad1baca7dfc09881df0f6eff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Fern=C3=A1ndez=20Casta=C3=B1o?= Date: Wed, 11 Aug 2021 12:03:03 +0200 Subject: [PATCH] Compute latest snapshot directly in TransportGetShardSnapshotAction (#76338) Backport of #76254 --- .../plan/ShardSnapshotsServiceIT.java | 55 +++++++++++-------- .../repositories/IndexSnapshotsServiceIT.java | 40 ++++++++------ .../get/shard/GetShardSnapshotResponse.java | 21 +++---- .../TransportGetShardSnapshotAction.java | 10 ++-- .../recovery/plan/ShardSnapshotsService.java | 42 +++++--------- .../plan/SnapshotsRecoveryPlannerService.java | 25 ++++----- ...ardSnapshotResponseSerializationTests.java | 15 ++--- .../SnapshotsRecoveryPlannerServiceTests.java | 27 ++++----- 8 files changed, 117 insertions(+), 118 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/plan/ShardSnapshotsServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/plan/ShardSnapshotsServiceIT.java index 16b6ff34a2b0f..0c0ea765a79f9 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/plan/ShardSnapshotsServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/plan/ShardSnapshotsServiceIT.java @@ -47,9 +47,9 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; @@ -137,8 +137,8 @@ public void testReturnsEmptyListWhenThereAreNotAvailableRepositories() throws Ex createIndex(indexName, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).build()); ShardId shardId = getShardIdForIndex(indexName); - List shardSnapshotData = getShardSnapshotShard(shardId); - assertThat(shardSnapshotData, is(empty())); + Optional shardSnapshot = getLatestShardSnapshot(shardId); + assertThat(shardSnapshot.isPresent(), is(equalTo(false))); } public void testOnlyFetchesSnapshotFromEnabledRepositories() throws Exception { @@ -171,18 +171,22 @@ public void testOnlyFetchesSnapshotFromEnabledRepositories() throws Exception { createSnapshot(repositoryName, snapshotName, indexName); } - List shardSnapshotDataForShard = getShardSnapshotShard(shardId); + Optional latestShardSnapshot = getLatestShardSnapshot(shardId); - assertThat(shardSnapshotDataForShard.size(), is(equalTo(numberOfRecoveryEnabledRepositories))); - for (ShardSnapshot shardSnapshotData : shardSnapshotDataForShard) { - assertThat(recoveryEnabledRepos.contains(shardSnapshotData.getRepository()), is(equalTo(true))); - assertThat(shardSnapshotData.getMetadataSnapshot().size(), is(greaterThan(0))); + if (numberOfRecoveryEnabledRepositories == 0) { + assertThat(latestShardSnapshot.isPresent(), is(equalTo(false))); + } else { + assertThat(latestShardSnapshot.isPresent(), is(equalTo(true))); + ShardSnapshot shardSnapshotData = latestShardSnapshot.get(); ShardSnapshotInfo shardSnapshotInfo = shardSnapshotData.getShardSnapshotInfo(); - assertThat(shardSnapshotInfo.getShardId(), is(equalTo(shardId))); - assertThat(shardSnapshotInfo.getSnapshot().getSnapshotId().getName(), is(equalTo(snapshotName))); assertThat(recoveryEnabledRepos.contains(shardSnapshotInfo.getRepository()), is(equalTo(true))); assertThat(nonEnabledRepos.contains(shardSnapshotInfo.getRepository()), is(equalTo(false))); + + assertThat(shardSnapshotData.getMetadataSnapshot().size(), is(greaterThan(0))); + + assertThat(shardSnapshotInfo.getShardId(), is(equalTo(shardId))); + assertThat(shardSnapshotInfo.getSnapshot().getSnapshotId().getName(), is(equalTo(snapshotName))); } } @@ -199,12 +203,14 @@ public void testFailingReposAreTreatedAsNonExistingShardSnapshots() throws Excep int numberOfFailingRepos = randomIntBetween(1, 3); List> failingRepos = new ArrayList<>(); + List failingRepoNames = new ArrayList<>(); for (int i = 0; i < numberOfFailingRepos; i++) { String repositoryName = "failing-repo-" + i; Path repoPath = randomRepoPath(); createRepository(repositoryName, FailingRepoPlugin.TYPE, repoPath, true); createSnapshot(repositoryName, snapshotName, indexName); failingRepos.add(Tuple.tuple(repositoryName, repoPath)); + failingRepoNames.add(repositoryName); } int numberOfWorkingRepositories = randomIntBetween(0, 4); @@ -227,20 +233,25 @@ public void testFailingReposAreTreatedAsNonExistingShardSnapshots() throws Excep assertAcked(client().admin().cluster().preparePutRepository(failingRepo.v1()) .setType(FailingRepoPlugin.TYPE) .setVerify(false) - .setSettings(Settings.builder().put(repoFailureType, true).put("location", randomRepoPath())) + .setSettings(Settings.builder().put(repoFailureType, true).put("location", failingRepo.v2())) ); } - List shardSnapshotDataForShard = getShardSnapshotShard(shardId); + Optional latestShardSnapshot = getLatestShardSnapshot(shardId); + + if (numberOfWorkingRepositories == 0) { + assertThat(latestShardSnapshot.isPresent(), is(equalTo(false))); + } else { + assertThat(latestShardSnapshot.isPresent(), is(equalTo(true))); + ShardSnapshot shardSnapshotData = latestShardSnapshot.get(); + ShardSnapshotInfo shardSnapshotInfo = shardSnapshotData.getShardSnapshotInfo(); + assertThat(workingRepos.contains(shardSnapshotInfo.getRepository()), is(equalTo(true))); + assertThat(failingRepoNames.contains(shardSnapshotInfo.getRepository()), is(equalTo(false))); - assertThat(shardSnapshotDataForShard.size(), is(equalTo(numberOfWorkingRepositories))); - for (ShardSnapshot shardSnapshotData : shardSnapshotDataForShard) { - assertThat(workingRepos.contains(shardSnapshotData.getRepository()), is(equalTo(true))); assertThat(shardSnapshotData.getMetadataSnapshot().size(), is(greaterThan(0))); - ShardSnapshotInfo shardSnapshotInfo = shardSnapshotData.getShardSnapshotInfo(); - assertThat(shardSnapshotInfo.getShardId(), equalTo(shardId)); - assertThat(shardSnapshotInfo.getSnapshot().getSnapshotId().getName(), equalTo(snapshotName)); + assertThat(shardSnapshotInfo.getShardId(), is(equalTo(shardId))); + assertThat(shardSnapshotInfo.getSnapshot().getSnapshotId().getName(), is(equalTo(snapshotName))); } } @@ -268,15 +279,15 @@ protected boolean masterSupportsFetchingLatestSnapshots() { } }; - PlainActionFuture> latestSnapshots = PlainActionFuture.newFuture(); + PlainActionFuture> latestSnapshots = PlainActionFuture.newFuture(); shardSnapshotsService.fetchLatestSnapshotsForShard(shardId, latestSnapshots); - assertThat(latestSnapshots.actionGet(), is(empty())); + assertThat(latestSnapshots.actionGet().isPresent(), is(equalTo(false))); } - private List getShardSnapshotShard(ShardId shardId) throws Exception { + private Optional getLatestShardSnapshot(ShardId shardId) throws Exception { ShardSnapshotsService shardSnapshotsService = getShardSnapshotsService(); - PlainActionFuture> future = PlainActionFuture.newFuture(); + PlainActionFuture> future = PlainActionFuture.newFuture(); shardSnapshotsService.fetchLatestSnapshotsForShard(shardId, future); return future.get(); } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/repositories/IndexSnapshotsServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/repositories/IndexSnapshotsServiceIT.java index 4c69b2bb2306f..4cee3a7b2bb85 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/repositories/IndexSnapshotsServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/repositories/IndexSnapshotsServiceIT.java @@ -30,7 +30,6 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -54,7 +53,7 @@ public void testGetShardSnapshotFromUnknownRepoReturnsAnError() throws Exception if (useMultipleUnknownRepositories) { GetShardSnapshotResponse response = responseFuture.get(); - assertThat(response.getRepositoryShardSnapshots(), is(anEmptyMap())); + assertThat(response.getLatestShardSnapshot().isPresent(), is(equalTo(false))); final Map failures = response.getRepositoryFailures(); for (String repository : repositories) { @@ -209,9 +208,11 @@ public void testGetShardSnapshotFailureHandlingLetOtherRepositoriesRequestsMakeP final String indexName = "test-idx"; createIndexWithContent(indexName); - createSnapshot(failingRepoName, "empty-snap", Collections.singletonList(indexName)); + int snapshotIdx = 0; + createSnapshot(failingRepoName, "empty-snap-" + snapshotIdx++, Collections.singletonList(indexName)); + SnapshotInfo latestSnapshot = null; for (String workingRepoName : workingRepoNames) { - createSnapshot(workingRepoName, "empty-snap", Collections.singletonList(indexName)); + latestSnapshot = createSnapshot(workingRepoName, "empty-snap-" + snapshotIdx++, Collections.singletonList(indexName)); } final MockRepository repository = getRepositoryOnMaster(failingRepoName); @@ -240,11 +241,17 @@ public void testGetShardSnapshotFailureHandlingLetOtherRepositoriesRequestsMakeP for (String workingRepoName : workingRepoNames) { assertThat(response.getFailureForRepository(workingRepoName).isPresent(), is(equalTo(false))); - assertThat(response.getIndexShardSnapshotInfoForRepository(workingRepoName).isPresent(), equalTo(true)); } + + Optional shardSnapshotInfoOpt = response.getLatestShardSnapshot(); + + assertThat(shardSnapshotInfoOpt.isPresent(), equalTo(true)); + ShardSnapshotInfo shardSnapshotInfo = shardSnapshotInfoOpt.get(); + assertThat(shardSnapshotInfo.getSnapshot(), equalTo(latestSnapshot.snapshot())); + assertThat(shardSnapshotInfo.getRepository(), equalTo(latestSnapshot.repository())); } - public void testGetShardSnapshotInMultipleRepositories() { + public void testGetShardSnapshotInMultipleRepositoriesReturnsTheLatestSnapshot() { int repoCount = randomIntBetween(2, 10); List repositories = new ArrayList<>(); for (int i = 0; i < repoCount; i++) { @@ -256,21 +263,21 @@ public void testGetShardSnapshotInMultipleRepositories() { final String indexName = "test-idx"; createIndexWithContent(indexName); - Map repositorySnapshots = new HashMap<>(); + int snapshotIdx = 0; + SnapshotInfo expectedLatestSnapshot = null; for (String repository : repositories) { - repositorySnapshots.put(repository, createSnapshot(repository, "snap-1", Collections.singletonList(indexName))); + expectedLatestSnapshot = createSnapshot(repository, "snap-" + snapshotIdx++, Collections.singletonList(indexName)); } GetShardSnapshotResponse response = getLatestSnapshotForShardFuture(repositories, indexName, 0).actionGet(); - for (String repository : repositories) { - assertThat(response.getFailureForRepository(repository).isPresent(), is(equalTo(false))); - Optional shardSnapshotInfoOpt = response.getIndexShardSnapshotInfoForRepository(repository); - assertThat(shardSnapshotInfoOpt.isPresent(), equalTo(true)); + assertThat(response.getRepositoryFailures(), is(anEmptyMap())); + Optional shardSnapshotInfoOpt = response.getLatestShardSnapshot(); - ShardSnapshotInfo shardSnapshotInfo = shardSnapshotInfoOpt.get(); - assertThat(shardSnapshotInfo.getSnapshot(), equalTo(repositorySnapshots.get(repository).snapshot())); - } + assertThat(shardSnapshotInfoOpt.isPresent(), equalTo(true)); + ShardSnapshotInfo shardSnapshotInfo = shardSnapshotInfoOpt.get(); + assertThat(shardSnapshotInfo.getSnapshot(), equalTo(expectedLatestSnapshot.snapshot())); + assertThat(shardSnapshotInfo.getRepository(), equalTo(expectedLatestSnapshot.repository())); } public void testFailedSnapshotsAreNotReturned() throws Exception { @@ -308,12 +315,13 @@ public void testFailedSnapshotsAreNotReturned() throws Exception { Optional latestSnapshotForShard = getLatestSnapshotForShard(repoName, indexName, 0); assertThat(latestSnapshotForShard.isPresent(), equalTo(true)); assertThat(latestSnapshotForShard.get().getSnapshot(), equalTo(snapshotInfo.snapshot())); + assertThat(latestSnapshotForShard.get().getRepository(), equalTo(snapshotInfo.repository())); } private Optional getLatestSnapshotForShard(String repository, String indexName, int shard) { final GetShardSnapshotResponse response = getLatestSnapshotForShardFuture(Collections.singletonList(repository), indexName, shard) .actionGet(); - return response.getIndexShardSnapshotInfoForRepository(repository); + return response.getLatestShardSnapshot(); } private PlainActionFuture getLatestSnapshotForShardFuture( diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/shard/GetShardSnapshotResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/shard/GetShardSnapshotResponse.java index cc49de8d5e855..08870248f2b8d 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/shard/GetShardSnapshotResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/shard/GetShardSnapshotResponse.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.core.Nullable; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.ShardSnapshotInfo; @@ -20,38 +21,34 @@ import java.util.Optional; public class GetShardSnapshotResponse extends ActionResponse { - public static GetShardSnapshotResponse EMPTY = new GetShardSnapshotResponse(Collections.emptyMap(), Collections.emptyMap()); + public static GetShardSnapshotResponse EMPTY = new GetShardSnapshotResponse(null, Collections.emptyMap()); - private final Map repositoryShardSnapshots; + private final ShardSnapshotInfo latestShardSnapshot; private final Map repositoryFailures; - GetShardSnapshotResponse(Map repositoryShardSnapshots, Map repositoryFailures) { - this.repositoryShardSnapshots = repositoryShardSnapshots; + GetShardSnapshotResponse(@Nullable ShardSnapshotInfo latestShardSnapshot, Map repositoryFailures) { + this.latestShardSnapshot = latestShardSnapshot; this.repositoryFailures = repositoryFailures; } GetShardSnapshotResponse(StreamInput in) throws IOException { super(in); - this.repositoryShardSnapshots = in.readMap(StreamInput::readString, ShardSnapshotInfo::new); + this.latestShardSnapshot = in.readOptionalWriteable(ShardSnapshotInfo::new); this.repositoryFailures = in.readMap(StreamInput::readString, RepositoryException::new); } @Override public void writeTo(StreamOutput out) throws IOException { - out.writeMap(repositoryShardSnapshots, StreamOutput::writeString, (o, info) -> info.writeTo(o)); + out.writeOptionalWriteable(latestShardSnapshot); out.writeMap(repositoryFailures, StreamOutput::writeString, (o, err) -> err.writeTo(o)); } - public Optional getIndexShardSnapshotInfoForRepository(String repositoryName) { - return Optional.ofNullable(repositoryShardSnapshots.get(repositoryName)); - } - public Optional getFailureForRepository(String repository) { return Optional.ofNullable(repositoryFailures.get(repository)); } - public Map getRepositoryShardSnapshots() { - return repositoryShardSnapshots; + public Optional getLatestShardSnapshot() { + return Optional.ofNullable(latestShardSnapshot); } public Map getRepositoryFailures() { diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/shard/TransportGetShardSnapshotAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/shard/TransportGetShardSnapshotAction.java index d0d46a69998e5..19dd870d38ffc 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/shard/TransportGetShardSnapshotAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/shard/TransportGetShardSnapshotAction.java @@ -30,6 +30,7 @@ import org.elasticsearch.transport.TransportService; import java.util.Collection; +import java.util.Comparator; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -40,7 +41,8 @@ import java.util.stream.Collectors; public class TransportGetShardSnapshotAction extends TransportMasterNodeAction { - + private static final Comparator LATEST_SNAPSHOT_COMPARATOR = Comparator.comparing(ShardSnapshotInfo::getStartedAt) + .thenComparing(snapshotInfo -> snapshotInfo.getSnapshot().getSnapshotId()); private final IndexSnapshotsService indexSnapshotsService; @Inject @@ -120,19 +122,19 @@ private void getShardSnapshots( private GetShardSnapshotResponse transformToResponse( Collection, RepositoryException>> shardSnapshots ) { - final Map repositoryShardSnapshot = shardSnapshots.stream() + final Optional latestSnapshot = shardSnapshots.stream() .map(Tuple::v1) .filter(Objects::nonNull) .filter(Optional::isPresent) .map(Optional::get) - .collect(Collectors.toMap(ShardSnapshotInfo::getRepository, Function.identity())); + .max(LATEST_SNAPSHOT_COMPARATOR); final Map failures = shardSnapshots.stream() .map(Tuple::v2) .filter(Objects::nonNull) .collect(Collectors.toMap(RepositoryException::repository, Function.identity())); - return new GetShardSnapshotResponse(repositoryShardSnapshot, failures); + return new GetShardSnapshotResponse(latestSnapshot.orElse(null), failures); } private Set getRequestedRepositories(GetShardSnapshotRequest request, ClusterState state) { diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/plan/ShardSnapshotsService.java b/server/src/main/java/org/elasticsearch/indices/recovery/plan/ShardSnapshotsService.java index f0d51e7f65b04..f72a3a01b1df0 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/plan/ShardSnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/plan/ShardSnapshotsService.java @@ -30,10 +30,8 @@ import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.threadpool.ThreadPool; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; import static org.elasticsearch.indices.recovery.RecoverySettings.SNAPSHOT_RECOVERIES_SUPPORTED_VERSION; @@ -56,7 +54,7 @@ public ShardSnapshotsService(Client client, this.clusterService = clusterService; } - public void fetchLatestSnapshotsForShard(ShardId shardId, ActionListener> listener) { + public void fetchLatestSnapshotsForShard(ShardId shardId, ActionListener> listener) { assert shardId != null : "SharId was null but a value was expected"; final RepositoriesMetadata currentReposMetadata = clusterService.state() @@ -72,7 +70,7 @@ public void fetchLatestSnapshotsForShard(ShardId shardId, ActionListener fetchSnapshotFiles(GetShardSnapshotResponse shardSnapshotResponse) { + private Optional fetchSnapshotFiles(GetShardSnapshotResponse shardSnapshotResponse) { assert Thread.currentThread().getName().contains(ThreadPool.Names.GENERIC); - if (shardSnapshotResponse.getRepositoryShardSnapshots().isEmpty()) { - return Collections.emptyList(); + final Optional latestShardSnapshotOpt = shardSnapshotResponse.getLatestShardSnapshot(); + if (latestShardSnapshotOpt.isPresent() == false) { + return Optional.empty(); } - Collection shardSnapshots = shardSnapshotResponse.getRepositoryShardSnapshots().values(); - List shardSnapshotData = new ArrayList<>(shardSnapshots.size()); - for (ShardSnapshotInfo shardSnapshot : shardSnapshots) { - final List snapshotFiles = getSnapshotFileList(shardSnapshot); - if (snapshotFiles.isEmpty() == false) { - shardSnapshotData.add(new ShardSnapshot(shardSnapshot, snapshotFiles)); - } - } - return shardSnapshotData; - } - - private List getSnapshotFileList(ShardSnapshotInfo shardSnapshotInfo) { + final ShardSnapshotInfo latestShardSnapshot = latestShardSnapshotOpt.get(); try { - final Snapshot snapshot = shardSnapshotInfo.getSnapshot(); + final Snapshot snapshot = latestShardSnapshot.getSnapshot(); final Repository repository = repositoriesService.repository(snapshot.getRepository()); if (repository instanceof BlobStoreRepository == false) { - return Collections.emptyList(); + return Optional.empty(); } BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository; - BlobContainer blobContainer = blobStoreRepository.shardContainer(shardSnapshotInfo.getIndexId(), - shardSnapshotInfo.getShardId().getId()); + BlobContainer blobContainer = blobStoreRepository.shardContainer(latestShardSnapshot.getIndexId(), + latestShardSnapshot.getShardId().getId()); BlobStoreIndexShardSnapshot blobStoreIndexShardSnapshot = blobStoreRepository.loadShardSnapshot(blobContainer, snapshot.getSnapshotId()); - return blobStoreIndexShardSnapshot.indexFiles(); + return Optional.of(new ShardSnapshot(latestShardSnapshot, blobStoreIndexShardSnapshot.indexFiles())); } catch (Exception e) { - logger.warn(new ParameterizedMessage("Unable to fetch shard snapshot files for {}", shardSnapshotInfo), e); - return Collections.emptyList(); + logger.warn(new ParameterizedMessage("Unable to fetch shard snapshot files for {}", latestShardSnapshot), e); + return Optional.empty(); } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/plan/SnapshotsRecoveryPlannerService.java b/server/src/main/java/org/elasticsearch/indices/recovery/plan/SnapshotsRecoveryPlannerService.java index 088c790d1fb41..35c47550849c4 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/plan/SnapshotsRecoveryPlannerService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/plan/SnapshotsRecoveryPlannerService.java @@ -18,9 +18,8 @@ import org.elasticsearch.index.store.StoreFileMetadata; import org.elasticsearch.indices.recovery.RecoverySettings; -import java.util.Collections; -import java.util.Comparator; import java.util.List; +import java.util.Optional; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; @@ -48,14 +47,14 @@ public void computeRecoveryPlan(ShardId shardId, // Fallback to source only recovery if the target node is in an incompatible version boolean canUseSnapshots = useSnapshots && targetVersion.onOrAfter(RecoverySettings.SNAPSHOT_RECOVERIES_SUPPORTED_VERSION); - fetchAvailableSnapshotsIgnoringErrors(shardId, canUseSnapshots, availableSnapshots -> + fetchLatestSnapshotsIgnoringErrors(shardId, canUseSnapshots, latestSnapshotOpt -> ActionListener.completeWith(listener, () -> computeRecoveryPlanWithSnapshots( sourceMetadata, targetMetadata, startingSeqNo, translogOps, - availableSnapshots + latestSnapshotOpt ) ) ); @@ -65,11 +64,11 @@ private ShardRecoveryPlan computeRecoveryPlanWithSnapshots(Store.MetadataSnapsho Store.MetadataSnapshot targetMetadata, long startingSeqNo, int translogOps, - List availableSnapshots) { + Optional latestSnapshotOpt) { Store.RecoveryDiff sourceTargetDiff = sourceMetadata.recoveryDiff(targetMetadata); List filesMissingInTarget = concatLists(sourceTargetDiff.missing, sourceTargetDiff.different); - if (availableSnapshots.isEmpty()) { + if (latestSnapshotOpt.isPresent() == false) { // If we couldn't find any valid snapshots, fallback to the source return new ShardRecoveryPlan(ShardRecoveryPlan.SnapshotFilesToRecover.EMPTY, filesMissingInTarget, @@ -80,9 +79,7 @@ private ShardRecoveryPlan computeRecoveryPlanWithSnapshots(Store.MetadataSnapsho ); } - ShardSnapshot latestSnapshot = availableSnapshots.stream() - .max(Comparator.comparingLong(ShardSnapshot::getStartedAt)) - .get(); + ShardSnapshot latestSnapshot = latestSnapshotOpt.get(); Store.MetadataSnapshot filesToRecoverFromSourceSnapshot = toMetadataSnapshot(filesMissingInTarget); Store.RecoveryDiff snapshotDiff = filesToRecoverFromSourceSnapshot.recoveryDiff(latestSnapshot.getMetadataSnapshot()); @@ -104,22 +101,22 @@ private ShardRecoveryPlan computeRecoveryPlanWithSnapshots(Store.MetadataSnapsho ); } - private void fetchAvailableSnapshotsIgnoringErrors(ShardId shardId, boolean useSnapshots, Consumer> listener) { + private void fetchLatestSnapshotsIgnoringErrors(ShardId shardId, boolean useSnapshots, Consumer> listener) { if (useSnapshots == false) { - listener.accept(Collections.emptyList()); + listener.accept(Optional.empty()); return; } - ActionListener> listenerIgnoringErrors = new ActionListener>() { + ActionListener> listenerIgnoringErrors = new ActionListener>() { @Override - public void onResponse(List shardSnapshotData) { + public void onResponse(Optional shardSnapshotData) { listener.accept(shardSnapshotData); } @Override public void onFailure(Exception e) { logger.warn(new ParameterizedMessage("Unable to fetch available snapshots for shard {}", shardId), e); - listener.accept(Collections.emptyList()); + listener.accept(Optional.empty()); } }; diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/get/shard/GetShardSnapshotResponseSerializationTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/get/shard/GetShardSnapshotResponseSerializationTests.java index 674d0f31b6eac..cbacc5b87a900 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/get/shard/GetShardSnapshotResponseSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/snapshots/get/shard/GetShardSnapshotResponseSerializationTests.java @@ -41,7 +41,7 @@ public void testSerialization() throws IOException { } private void assertEqualInstances(GetShardSnapshotResponse expectedInstance, GetShardSnapshotResponse newInstance) { - assertThat(newInstance.getRepositoryShardSnapshots(), equalTo(expectedInstance.getRepositoryShardSnapshots())); + assertThat(newInstance.getLatestShardSnapshot(), equalTo(expectedInstance.getLatestShardSnapshot())); assertEquals(expectedInstance.getRepositoryFailures().keySet(), newInstance.getRepositoryFailures().keySet()); for (Map.Entry expectedEntry : expectedInstance.getRepositoryFailures().entrySet()) { ElasticsearchException expectedException = expectedEntry.getValue(); @@ -61,15 +61,13 @@ private GetShardSnapshotResponse copyInstance(GetShardSnapshotResponse instance) } private GetShardSnapshotResponse createTestInstance() { - Map repositoryShardSnapshots = randomMap(0, randomIntBetween(1, 10), this::repositoryShardSnapshot); + ShardSnapshotInfo latestSnapshot = repositoryShardSnapshot(); Map repositoryFailures = randomMap(0, randomIntBetween(1, 10), this::repositoryFailure); - return new GetShardSnapshotResponse(repositoryShardSnapshots, repositoryFailures); + return new GetShardSnapshotResponse(latestSnapshot, repositoryFailures); } - private Tuple repositoryShardSnapshot() { - String repositoryName = randomString(50); - + private ShardSnapshotInfo repositoryShardSnapshot() { final String indexName = randomString(50); ShardId shardId = new ShardId(indexName, UUIDs.randomBase64UUID(), randomIntBetween(0, 100)); Snapshot snapshot = new Snapshot(randomAlphaOfLength(5), new SnapshotId(randomAlphaOfLength(5), randomAlphaOfLength(5))); @@ -77,10 +75,7 @@ private Tuple repositoryShardSnapshot() { IndexId indexId = new IndexId(indexName, randomString(25)); String shardStateIdentifier = randomBoolean() ? randomString(30) : null; - return Tuple.tuple( - repositoryName, - new ShardSnapshotInfo(indexId, shardId, snapshot, indexMetadataIdentifier, shardStateIdentifier, randomLongBetween(0, 2048)) - ); + return new ShardSnapshotInfo(indexId, shardId, snapshot, indexMetadataIdentifier, shardStateIdentifier, randomLongBetween(0, 2048)); } private Tuple repositoryFailure() { diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/plan/SnapshotsRecoveryPlannerServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/plan/SnapshotsRecoveryPlannerServiceTests.java index b1b9392b83657..b8156e4c2d0a0 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/plan/SnapshotsRecoveryPlannerServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/plan/SnapshotsRecoveryPlannerServiceTests.java @@ -47,6 +47,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.stream.Collectors; @@ -92,7 +93,7 @@ public void testOnlyUsesSourceFilesWhenUseSnapshotsFlagIsFalse() throws Exceptio translogOps, new ShardSnapshotsService(null, null, null, null) { @Override - public void fetchLatestSnapshotsForShard(ShardId shardId, ActionListener> listener) { + public void fetchLatestSnapshotsForShard(ShardId shardId, ActionListener> listener) { assert false: "Unexpected call"; } }, @@ -125,9 +126,9 @@ public void testFallbacksToRegularPlanIfThereAreNotAvailableSnapshotsOrThereIsAF translogOps, new ShardSnapshotsService(null, null, null, null) { @Override - public void fetchLatestSnapshotsForShard(ShardId shardId, ActionListener> listener) { + public void fetchLatestSnapshotsForShard(ShardId shardId, ActionListener> listener) { if (randomBoolean()) { - listener.onResponse(Collections.emptyList()); + listener.onResponse(Optional.empty()); } else { listener.onFailure(new IOException("Boom!")); } @@ -168,8 +169,8 @@ public void testLogicallyEquivalentSnapshotIsUsed() throws Exception { translogOps, new ShardSnapshotsService(null, null, null, null) { @Override - public void fetchLatestSnapshotsForShard(ShardId shardId, ActionListener> listener) { - listener.onResponse(Collections.singletonList(shardSnapshotData)); + public void fetchLatestSnapshotsForShard(ShardId shardId, ActionListener> listener) { + listener.onResponse(Optional.of(shardSnapshotData)); } }, true @@ -207,8 +208,8 @@ public void testLogicallyEquivalentSnapshotIsSkippedIfUnderlyingFilesAreDifferen translogOps, new ShardSnapshotsService(null, null, null, null) { @Override - public void fetchLatestSnapshotsForShard(ShardId shardId, ActionListener> listener) { - listener.onResponse(Collections.singletonList(shardSnapshotData)); + public void fetchLatestSnapshotsForShard(ShardId shardId, ActionListener> listener) { + listener.onResponse(Optional.of(shardSnapshotData)); } }, true @@ -256,8 +257,8 @@ public void testPlannerTriesToUseMostFilesFromSnapshots() throws Exception { translogOps, new ShardSnapshotsService(null, null, null, null) { @Override - public void fetchLatestSnapshotsForShard(ShardId shardId, ActionListener> listener) { - listener.onResponse(availableSnapshots); + public void fetchLatestSnapshotsForShard(ShardId shardId, ActionListener> listener) { + listener.onResponse(Optional.of(availableSnapshots.get(availableSnapshots.size() - 1))); } }, true @@ -308,8 +309,8 @@ public void testSnapshotsWithADifferentHistoryUUIDAreUsedIfFilesAreShared() thro translogOps, new ShardSnapshotsService(null, null, null, null) { @Override - public void fetchLatestSnapshotsForShard(ShardId shardId, ActionListener> listener) { - listener.onResponse(availableSnapshots); + public void fetchLatestSnapshotsForShard(ShardId shardId, ActionListener> listener) { + listener.onResponse(Optional.of(availableSnapshots.get(availableSnapshots.size() - 1))); } }, true @@ -344,8 +345,8 @@ public void testFallbacksToSourceOnlyPlanIfTargetNodeIsInUnsupportedVersion() th translogOps, new ShardSnapshotsService(null, null, null, null) { @Override - public void fetchLatestSnapshotsForShard(ShardId shardId, ActionListener> listener) { - listener.onResponse(Collections.singletonList(shardSnapshot)); + public void fetchLatestSnapshotsForShard(ShardId shardId, ActionListener> listener) { + listener.onResponse(Optional.of(shardSnapshot)); } }, true,