Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compute latest snapshot directly in TransportGetShardSnapshotAction #76254

Merged
merged 4 commits into from
Aug 11, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -137,8 +137,8 @@ public void testReturnsEmptyListWhenThereAreNotAvailableRepositories() throws Ex
createIndex(indexName, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).build());
ShardId shardId = getShardIdForIndex(indexName);

List<ShardSnapshot> shardSnapshotData = getShardSnapshotShard(shardId);
assertThat(shardSnapshotData, is(empty()));
Optional<ShardSnapshot> shardSnapshot = getLatestShardSnapshot(shardId);
assertThat(shardSnapshot.isPresent(), is(equalTo(false)));
}

public void testOnlyFetchesSnapshotFromEnabledRepositories() throws Exception {
Expand Down Expand Up @@ -171,18 +171,22 @@ public void testOnlyFetchesSnapshotFromEnabledRepositories() throws Exception {
createSnapshot(repositoryName, snapshotName, indexName);
}

List<ShardSnapshot> shardSnapshotDataForShard = getShardSnapshotShard(shardId);
Optional<ShardSnapshot> latestShardSnapshot = getLatestShardSnapshot(shardId);

assertThat(shardSnapshotDataForShard.size(), is(equalTo(numberOfRecoveryEnabledRepositories)));
for (ShardSnapshot shardSnapshotData : shardSnapshotDataForShard) {
assertThat(recoveryEnabledRepos.contains(shardSnapshotData.getRepository()), is(equalTo(true)));
assertThat(shardSnapshotData.getMetadataSnapshot().size(), is(greaterThan(0)));
if (numberOfRecoveryEnabledRepositories == 0) {
assertThat(latestShardSnapshot.isPresent(), is(equalTo(false)));
} else {
assertThat(latestShardSnapshot.isPresent(), is(equalTo(true)));

ShardSnapshot shardSnapshotData = latestShardSnapshot.get();
ShardSnapshotInfo shardSnapshotInfo = shardSnapshotData.getShardSnapshotInfo();
assertThat(shardSnapshotInfo.getShardId(), is(equalTo(shardId)));
assertThat(shardSnapshotInfo.getSnapshot().getSnapshotId().getName(), is(equalTo(snapshotName)));
assertThat(recoveryEnabledRepos.contains(shardSnapshotInfo.getRepository()), is(equalTo(true)));
assertThat(nonEnabledRepos.contains(shardSnapshotInfo.getRepository()), is(equalTo(false)));

assertThat(shardSnapshotData.getMetadataSnapshot().size(), is(greaterThan(0)));

assertThat(shardSnapshotInfo.getShardId(), is(equalTo(shardId)));
assertThat(shardSnapshotInfo.getSnapshot().getSnapshotId().getName(), is(equalTo(snapshotName)));
}
}

Expand All @@ -199,12 +203,14 @@ public void testFailingReposAreTreatedAsNonExistingShardSnapshots() throws Excep

int numberOfFailingRepos = randomIntBetween(1, 3);
List<Tuple<String, Path>> failingRepos = new ArrayList<>();
List<String> failingRepoNames = new ArrayList<>();
for (int i = 0; i < numberOfFailingRepos; i++) {
String repositoryName = "failing-repo-" + i;
Path repoPath = randomRepoPath();
createRepository(repositoryName, FailingRepoPlugin.TYPE, repoPath, true);
createSnapshot(repositoryName, snapshotName, indexName);
failingRepos.add(Tuple.tuple(repositoryName, repoPath));
failingRepoNames.add(repositoryName);
}

int numberOfWorkingRepositories = randomIntBetween(0, 4);
Expand All @@ -227,20 +233,25 @@ public void testFailingReposAreTreatedAsNonExistingShardSnapshots() throws Excep
assertAcked(client().admin().cluster().preparePutRepository(failingRepo.v1())
.setType(FailingRepoPlugin.TYPE)
.setVerify(false)
.setSettings(Settings.builder().put(repoFailureType, true).put("location", randomRepoPath()))
.setSettings(Settings.builder().put(repoFailureType, true).put("location", failingRepo.v2()))
);
}

List<ShardSnapshot> shardSnapshotDataForShard = getShardSnapshotShard(shardId);
Optional<ShardSnapshot> latestShardSnapshot = getLatestShardSnapshot(shardId);

if (numberOfWorkingRepositories == 0) {
assertThat(latestShardSnapshot.isPresent(), is(equalTo(false)));
} else {
assertThat(latestShardSnapshot.isPresent(), is(equalTo(true)));
ShardSnapshot shardSnapshotData = latestShardSnapshot.get();
ShardSnapshotInfo shardSnapshotInfo = shardSnapshotData.getShardSnapshotInfo();
assertThat(workingRepos.contains(shardSnapshotInfo.getRepository()), is(equalTo(true)));
assertThat(failingRepoNames.contains(shardSnapshotInfo.getRepository()), is(equalTo(false)));

assertThat(shardSnapshotDataForShard.size(), is(equalTo(numberOfWorkingRepositories)));
for (ShardSnapshot shardSnapshotData : shardSnapshotDataForShard) {
assertThat(workingRepos.contains(shardSnapshotData.getRepository()), is(equalTo(true)));
assertThat(shardSnapshotData.getMetadataSnapshot().size(), is(greaterThan(0)));

ShardSnapshotInfo shardSnapshotInfo = shardSnapshotData.getShardSnapshotInfo();
assertThat(shardSnapshotInfo.getShardId(), equalTo(shardId));
assertThat(shardSnapshotInfo.getSnapshot().getSnapshotId().getName(), equalTo(snapshotName));
assertThat(shardSnapshotInfo.getShardId(), is(equalTo(shardId)));
assertThat(shardSnapshotInfo.getSnapshot().getSnapshotId().getName(), is(equalTo(snapshotName)));
}
}

Expand Down Expand Up @@ -268,15 +279,15 @@ protected boolean masterSupportsFetchingLatestSnapshots() {
}
};

PlainActionFuture<List<ShardSnapshot>> latestSnapshots = PlainActionFuture.newFuture();
PlainActionFuture<Optional<ShardSnapshot>> latestSnapshots = PlainActionFuture.newFuture();
shardSnapshotsService.fetchLatestSnapshotsForShard(shardId, latestSnapshots);
assertThat(latestSnapshots.actionGet(), is(empty()));
assertThat(latestSnapshots.actionGet().isPresent(), is(equalTo(false)));
}

private List<ShardSnapshot> getShardSnapshotShard(ShardId shardId) throws Exception {
private Optional<ShardSnapshot> getLatestShardSnapshot(ShardId shardId) throws Exception {
ShardSnapshotsService shardSnapshotsService = getShardSnapshotsService();

PlainActionFuture<List<ShardSnapshot>> future = PlainActionFuture.newFuture();
PlainActionFuture<Optional<ShardSnapshot>> future = PlainActionFuture.newFuture();
shardSnapshotsService.fetchLatestSnapshotsForShard(shardId, future);
return future.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand All @@ -52,7 +51,7 @@ public void testGetShardSnapshotFromUnknownRepoReturnsAnError() throws Exception

if (useMultipleUnknownRepositories) {
GetShardSnapshotResponse response = responseFuture.get();
assertThat(response.getRepositoryShardSnapshots(), is(anEmptyMap()));
assertThat(response.getLatestShardSnapshot().isPresent(), is(equalTo(false)));

final Map<String, RepositoryException> failures = response.getRepositoryFailures();
for (String repository : repositories) {
Expand Down Expand Up @@ -207,9 +206,11 @@ public void testGetShardSnapshotFailureHandlingLetOtherRepositoriesRequestsMakeP
final String indexName = "test-idx";
createIndexWithContent(indexName);

createSnapshot(failingRepoName, "empty-snap", Collections.singletonList(indexName));
int snapshotIdx = 0;
createSnapshot(failingRepoName, "empty-snap-" + snapshotIdx++, Collections.singletonList(indexName));
SnapshotInfo latestSnapshot = null;
for (String workingRepoName : workingRepoNames) {
createSnapshot(workingRepoName, "empty-snap", Collections.singletonList(indexName));
latestSnapshot = createSnapshot(workingRepoName, "empty-snap-" + snapshotIdx++, Collections.singletonList(indexName));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with giving all the snapshots unique names, but I'm curious whether this was necessary (and why).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These snapshots can take a short time to finish and some might overlap, in that case we need some tie breaker and I decided to use the name.

}

final MockRepository repository = getRepositoryOnMaster(failingRepoName);
Expand Down Expand Up @@ -238,11 +239,17 @@ public void testGetShardSnapshotFailureHandlingLetOtherRepositoriesRequestsMakeP

for (String workingRepoName : workingRepoNames) {
assertThat(response.getFailureForRepository(workingRepoName).isEmpty(), is(equalTo(true)));
assertThat(response.getIndexShardSnapshotInfoForRepository(workingRepoName).isPresent(), equalTo(true));
}

Optional<ShardSnapshotInfo> shardSnapshotInfoOpt = response.getLatestShardSnapshot();

assertThat(shardSnapshotInfoOpt.isPresent(), equalTo(true));
ShardSnapshotInfo shardSnapshotInfo = shardSnapshotInfoOpt.get();
assertThat(shardSnapshotInfo.getSnapshot(), equalTo(latestSnapshot.snapshot()));
assertThat(shardSnapshotInfo.getRepository(), equalTo(latestSnapshot.repository()));
}

public void testGetShardSnapshotInMultipleRepositories() {
public void testGetShardSnapshotInMultipleRepositoriesReturnsTheLatestSnapshot() {
int repoCount = randomIntBetween(2, 10);
List<String> repositories = new ArrayList<>();
for (int i = 0; i < repoCount; i++) {
Expand All @@ -254,21 +261,21 @@ public void testGetShardSnapshotInMultipleRepositories() {
final String indexName = "test-idx";
createIndexWithContent(indexName);

Map<String, SnapshotInfo> repositorySnapshots = new HashMap<>();
int snapshotIdx = 0;
SnapshotInfo expectedLatestSnapshot = null;
for (String repository : repositories) {
repositorySnapshots.put(repository, createSnapshot(repository, "snap-1", Collections.singletonList(indexName)));
expectedLatestSnapshot = createSnapshot(repository, "snap-" + snapshotIdx++, Collections.singletonList(indexName));
}

GetShardSnapshotResponse response = getLatestSnapshotForShardFuture(repositories, indexName, 0).actionGet();

for (String repository : repositories) {
assertThat(response.getFailureForRepository(repository).isEmpty(), is(equalTo(true)));
Optional<ShardSnapshotInfo> shardSnapshotInfoOpt = response.getIndexShardSnapshotInfoForRepository(repository);
assertThat(shardSnapshotInfoOpt.isPresent(), equalTo(true));
assertThat(response.getRepositoryFailures(), is(anEmptyMap()));
Optional<ShardSnapshotInfo> shardSnapshotInfoOpt = response.getLatestShardSnapshot();

ShardSnapshotInfo shardSnapshotInfo = shardSnapshotInfoOpt.get();
assertThat(shardSnapshotInfo.getSnapshot(), equalTo(repositorySnapshots.get(repository).snapshot()));
}
assertThat(shardSnapshotInfoOpt.isPresent(), equalTo(true));
ShardSnapshotInfo shardSnapshotInfo = shardSnapshotInfoOpt.get();
assertThat(shardSnapshotInfo.getSnapshot(), equalTo(expectedLatestSnapshot.snapshot()));
assertThat(shardSnapshotInfo.getRepository(), equalTo(expectedLatestSnapshot.repository()));
}

public void testFailedSnapshotsAreNotReturned() throws Exception {
Expand Down Expand Up @@ -306,12 +313,13 @@ public void testFailedSnapshotsAreNotReturned() throws Exception {
Optional<ShardSnapshotInfo> latestSnapshotForShard = getLatestSnapshotForShard(repoName, indexName, 0);
assertThat(latestSnapshotForShard.isPresent(), equalTo(true));
assertThat(latestSnapshotForShard.get().getSnapshot(), equalTo(snapshotInfo.snapshot()));
assertThat(latestSnapshotForShard.get().getRepository(), equalTo(snapshotInfo.repository()));
}

private Optional<ShardSnapshotInfo> getLatestSnapshotForShard(String repository, String indexName, int shard) {
final GetShardSnapshotResponse response = getLatestSnapshotForShardFuture(Collections.singletonList(repository), indexName, shard)
.actionGet();
return response.getIndexShardSnapshotInfoForRepository(repository);
return response.getLatestShardSnapshot();
}

private PlainActionFuture<GetShardSnapshotResponse> getLatestSnapshotForShardFuture(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.ShardSnapshotInfo;

Expand All @@ -20,38 +21,34 @@
import java.util.Optional;

public class GetShardSnapshotResponse extends ActionResponse {
public static GetShardSnapshotResponse EMPTY = new GetShardSnapshotResponse(Collections.emptyMap(), Collections.emptyMap());
public static GetShardSnapshotResponse EMPTY = new GetShardSnapshotResponse(null, Collections.emptyMap());

private final Map<String, ShardSnapshotInfo> repositoryShardSnapshots;
private final ShardSnapshotInfo latestShardSnapshot;
private final Map<String, RepositoryException> repositoryFailures;

GetShardSnapshotResponse(Map<String, ShardSnapshotInfo> repositoryShardSnapshots, Map<String, RepositoryException> repositoryFailures) {
this.repositoryShardSnapshots = repositoryShardSnapshots;
GetShardSnapshotResponse(@Nullable ShardSnapshotInfo latestShardSnapshot, Map<String, RepositoryException> repositoryFailures) {
this.latestShardSnapshot = latestShardSnapshot;
this.repositoryFailures = repositoryFailures;
}

GetShardSnapshotResponse(StreamInput in) throws IOException {
super(in);
this.repositoryShardSnapshots = in.readMap(StreamInput::readString, ShardSnapshotInfo::new);
this.latestShardSnapshot = in.readOptionalWriteable(ShardSnapshotInfo::new);
this.repositoryFailures = in.readMap(StreamInput::readString, RepositoryException::new);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeMap(repositoryShardSnapshots, StreamOutput::writeString, (o, info) -> info.writeTo(o));
out.writeOptionalWriteable(latestShardSnapshot);
out.writeMap(repositoryFailures, StreamOutput::writeString, (o, err) -> err.writeTo(o));
}

public Optional<ShardSnapshotInfo> getIndexShardSnapshotInfoForRepository(String repositoryName) {
return Optional.ofNullable(repositoryShardSnapshots.get(repositoryName));
}

public Optional<RepositoryException> getFailureForRepository(String repository) {
return Optional.ofNullable(repositoryFailures.get(repository));
}

public Map<String, ShardSnapshotInfo> getRepositoryShardSnapshots() {
return repositoryShardSnapshots;
public Optional<ShardSnapshotInfo> getLatestShardSnapshot() {
return Optional.ofNullable(latestShardSnapshot);
}

public Map<String, RepositoryException> getRepositoryFailures() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.transport.TransportService;

import java.util.Collection;
import java.util.Comparator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand All @@ -41,7 +42,8 @@
import java.util.stream.Collectors;

public class TransportGetShardSnapshotAction extends TransportMasterNodeAction<GetShardSnapshotRequest, GetShardSnapshotResponse> {

private static final Comparator<ShardSnapshotInfo> LATEST_SNAPSHOT_COMPARATOR = Comparator.comparing(ShardSnapshotInfo::getStartedAt)
.thenComparing(snapshotInfo -> snapshotInfo.getSnapshot().getSnapshotId());
private final IndexSnapshotsService indexSnapshotsService;

@Inject
Expand Down Expand Up @@ -125,19 +127,19 @@ private void getShardSnapshots(
private GetShardSnapshotResponse transformToResponse(
Collection<Tuple<Optional<ShardSnapshotInfo>, RepositoryException>> shardSnapshots
) {
final Map<String, ShardSnapshotInfo> repositoryShardSnapshot = shardSnapshots.stream()
final Optional<ShardSnapshotInfo> latestSnapshot = shardSnapshots.stream()
.map(Tuple::v1)
.filter(Objects::nonNull)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toMap(ShardSnapshotInfo::getRepository, Function.identity()));
.max(LATEST_SNAPSHOT_COMPARATOR);

final Map<String, RepositoryException> failures = shardSnapshots.stream()
.map(Tuple::v2)
.filter(Objects::nonNull)
.collect(Collectors.toMap(RepositoryException::repository, Function.identity()));

return new GetShardSnapshotResponse(repositoryShardSnapshot, failures);
return new GetShardSnapshotResponse(latestSnapshot.orElse(null), failures);
}

private Set<String> getRequestedRepositories(GetShardSnapshotRequest request, ClusterState state) {
Expand Down
Loading