Skip to content

Commit

Permalink
Compute latest snapshot directly in TransportGetShardSnapshotAction (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
fcofdez authored Aug 11, 2021
1 parent 0fb10cc commit ca1f4dd
Show file tree
Hide file tree
Showing 8 changed files with 117 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -137,8 +137,8 @@ public void testReturnsEmptyListWhenThereAreNotAvailableRepositories() throws Ex
createIndex(indexName, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).build());
ShardId shardId = getShardIdForIndex(indexName);

List<ShardSnapshot> shardSnapshotData = getShardSnapshotShard(shardId);
assertThat(shardSnapshotData, is(empty()));
Optional<ShardSnapshot> shardSnapshot = getLatestShardSnapshot(shardId);
assertThat(shardSnapshot.isPresent(), is(equalTo(false)));
}

public void testOnlyFetchesSnapshotFromEnabledRepositories() throws Exception {
Expand Down Expand Up @@ -171,18 +171,22 @@ public void testOnlyFetchesSnapshotFromEnabledRepositories() throws Exception {
createSnapshot(repositoryName, snapshotName, indexName);
}

List<ShardSnapshot> shardSnapshotDataForShard = getShardSnapshotShard(shardId);
Optional<ShardSnapshot> latestShardSnapshot = getLatestShardSnapshot(shardId);

assertThat(shardSnapshotDataForShard.size(), is(equalTo(numberOfRecoveryEnabledRepositories)));
for (ShardSnapshot shardSnapshotData : shardSnapshotDataForShard) {
assertThat(recoveryEnabledRepos.contains(shardSnapshotData.getRepository()), is(equalTo(true)));
assertThat(shardSnapshotData.getMetadataSnapshot().size(), is(greaterThan(0)));
if (numberOfRecoveryEnabledRepositories == 0) {
assertThat(latestShardSnapshot.isPresent(), is(equalTo(false)));
} else {
assertThat(latestShardSnapshot.isPresent(), is(equalTo(true)));

ShardSnapshot shardSnapshotData = latestShardSnapshot.get();
ShardSnapshotInfo shardSnapshotInfo = shardSnapshotData.getShardSnapshotInfo();
assertThat(shardSnapshotInfo.getShardId(), is(equalTo(shardId)));
assertThat(shardSnapshotInfo.getSnapshot().getSnapshotId().getName(), is(equalTo(snapshotName)));
assertThat(recoveryEnabledRepos.contains(shardSnapshotInfo.getRepository()), is(equalTo(true)));
assertThat(nonEnabledRepos.contains(shardSnapshotInfo.getRepository()), is(equalTo(false)));

assertThat(shardSnapshotData.getMetadataSnapshot().size(), is(greaterThan(0)));

assertThat(shardSnapshotInfo.getShardId(), is(equalTo(shardId)));
assertThat(shardSnapshotInfo.getSnapshot().getSnapshotId().getName(), is(equalTo(snapshotName)));
}
}

Expand All @@ -199,12 +203,14 @@ public void testFailingReposAreTreatedAsNonExistingShardSnapshots() throws Excep

int numberOfFailingRepos = randomIntBetween(1, 3);
List<Tuple<String, Path>> failingRepos = new ArrayList<>();
List<String> failingRepoNames = new ArrayList<>();
for (int i = 0; i < numberOfFailingRepos; i++) {
String repositoryName = "failing-repo-" + i;
Path repoPath = randomRepoPath();
createRepository(repositoryName, FailingRepoPlugin.TYPE, repoPath, true);
createSnapshot(repositoryName, snapshotName, indexName);
failingRepos.add(Tuple.tuple(repositoryName, repoPath));
failingRepoNames.add(repositoryName);
}

int numberOfWorkingRepositories = randomIntBetween(0, 4);
Expand All @@ -227,20 +233,25 @@ public void testFailingReposAreTreatedAsNonExistingShardSnapshots() throws Excep
assertAcked(client().admin().cluster().preparePutRepository(failingRepo.v1())
.setType(FailingRepoPlugin.TYPE)
.setVerify(false)
.setSettings(Settings.builder().put(repoFailureType, true).put("location", randomRepoPath()))
.setSettings(Settings.builder().put(repoFailureType, true).put("location", failingRepo.v2()))
);
}

List<ShardSnapshot> shardSnapshotDataForShard = getShardSnapshotShard(shardId);
Optional<ShardSnapshot> latestShardSnapshot = getLatestShardSnapshot(shardId);

if (numberOfWorkingRepositories == 0) {
assertThat(latestShardSnapshot.isPresent(), is(equalTo(false)));
} else {
assertThat(latestShardSnapshot.isPresent(), is(equalTo(true)));
ShardSnapshot shardSnapshotData = latestShardSnapshot.get();
ShardSnapshotInfo shardSnapshotInfo = shardSnapshotData.getShardSnapshotInfo();
assertThat(workingRepos.contains(shardSnapshotInfo.getRepository()), is(equalTo(true)));
assertThat(failingRepoNames.contains(shardSnapshotInfo.getRepository()), is(equalTo(false)));

assertThat(shardSnapshotDataForShard.size(), is(equalTo(numberOfWorkingRepositories)));
for (ShardSnapshot shardSnapshotData : shardSnapshotDataForShard) {
assertThat(workingRepos.contains(shardSnapshotData.getRepository()), is(equalTo(true)));
assertThat(shardSnapshotData.getMetadataSnapshot().size(), is(greaterThan(0)));

ShardSnapshotInfo shardSnapshotInfo = shardSnapshotData.getShardSnapshotInfo();
assertThat(shardSnapshotInfo.getShardId(), equalTo(shardId));
assertThat(shardSnapshotInfo.getSnapshot().getSnapshotId().getName(), equalTo(snapshotName));
assertThat(shardSnapshotInfo.getShardId(), is(equalTo(shardId)));
assertThat(shardSnapshotInfo.getSnapshot().getSnapshotId().getName(), is(equalTo(snapshotName)));
}
}

Expand Down Expand Up @@ -268,15 +279,15 @@ protected boolean masterSupportsFetchingLatestSnapshots() {
}
};

PlainActionFuture<List<ShardSnapshot>> latestSnapshots = PlainActionFuture.newFuture();
PlainActionFuture<Optional<ShardSnapshot>> latestSnapshots = PlainActionFuture.newFuture();
shardSnapshotsService.fetchLatestSnapshotsForShard(shardId, latestSnapshots);
assertThat(latestSnapshots.actionGet(), is(empty()));
assertThat(latestSnapshots.actionGet().isPresent(), is(equalTo(false)));
}

private List<ShardSnapshot> getShardSnapshotShard(ShardId shardId) throws Exception {
private Optional<ShardSnapshot> getLatestShardSnapshot(ShardId shardId) throws Exception {
ShardSnapshotsService shardSnapshotsService = getShardSnapshotsService();

PlainActionFuture<List<ShardSnapshot>> future = PlainActionFuture.newFuture();
PlainActionFuture<Optional<ShardSnapshot>> future = PlainActionFuture.newFuture();
shardSnapshotsService.fetchLatestSnapshotsForShard(shardId, future);
return future.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand All @@ -52,7 +51,7 @@ public void testGetShardSnapshotFromUnknownRepoReturnsAnError() throws Exception

if (useMultipleUnknownRepositories) {
GetShardSnapshotResponse response = responseFuture.get();
assertThat(response.getRepositoryShardSnapshots(), is(anEmptyMap()));
assertThat(response.getLatestShardSnapshot().isPresent(), is(equalTo(false)));

final Map<String, RepositoryException> failures = response.getRepositoryFailures();
for (String repository : repositories) {
Expand Down Expand Up @@ -207,9 +206,11 @@ public void testGetShardSnapshotFailureHandlingLetOtherRepositoriesRequestsMakeP
final String indexName = "test-idx";
createIndexWithContent(indexName);

createSnapshot(failingRepoName, "empty-snap", Collections.singletonList(indexName));
int snapshotIdx = 0;
createSnapshot(failingRepoName, "empty-snap-" + snapshotIdx++, Collections.singletonList(indexName));
SnapshotInfo latestSnapshot = null;
for (String workingRepoName : workingRepoNames) {
createSnapshot(workingRepoName, "empty-snap", Collections.singletonList(indexName));
latestSnapshot = createSnapshot(workingRepoName, "empty-snap-" + snapshotIdx++, Collections.singletonList(indexName));
}

final MockRepository repository = getRepositoryOnMaster(failingRepoName);
Expand Down Expand Up @@ -238,11 +239,17 @@ public void testGetShardSnapshotFailureHandlingLetOtherRepositoriesRequestsMakeP

for (String workingRepoName : workingRepoNames) {
assertThat(response.getFailureForRepository(workingRepoName).isEmpty(), is(equalTo(true)));
assertThat(response.getIndexShardSnapshotInfoForRepository(workingRepoName).isPresent(), equalTo(true));
}

Optional<ShardSnapshotInfo> shardSnapshotInfoOpt = response.getLatestShardSnapshot();

assertThat(shardSnapshotInfoOpt.isPresent(), equalTo(true));
ShardSnapshotInfo shardSnapshotInfo = shardSnapshotInfoOpt.get();
assertThat(shardSnapshotInfo.getSnapshot(), equalTo(latestSnapshot.snapshot()));
assertThat(shardSnapshotInfo.getRepository(), equalTo(latestSnapshot.repository()));
}

public void testGetShardSnapshotInMultipleRepositories() {
public void testGetShardSnapshotInMultipleRepositoriesReturnsTheLatestSnapshot() {
int repoCount = randomIntBetween(2, 10);
List<String> repositories = new ArrayList<>();
for (int i = 0; i < repoCount; i++) {
Expand All @@ -254,21 +261,21 @@ public void testGetShardSnapshotInMultipleRepositories() {
final String indexName = "test-idx";
createIndexWithContent(indexName);

Map<String, SnapshotInfo> repositorySnapshots = new HashMap<>();
int snapshotIdx = 0;
SnapshotInfo expectedLatestSnapshot = null;
for (String repository : repositories) {
repositorySnapshots.put(repository, createSnapshot(repository, "snap-1", Collections.singletonList(indexName)));
expectedLatestSnapshot = createSnapshot(repository, "snap-" + snapshotIdx++, Collections.singletonList(indexName));
}

GetShardSnapshotResponse response = getLatestSnapshotForShardFuture(repositories, indexName, 0).actionGet();

for (String repository : repositories) {
assertThat(response.getFailureForRepository(repository).isEmpty(), is(equalTo(true)));
Optional<ShardSnapshotInfo> shardSnapshotInfoOpt = response.getIndexShardSnapshotInfoForRepository(repository);
assertThat(shardSnapshotInfoOpt.isPresent(), equalTo(true));
assertThat(response.getRepositoryFailures(), is(anEmptyMap()));
Optional<ShardSnapshotInfo> shardSnapshotInfoOpt = response.getLatestShardSnapshot();

ShardSnapshotInfo shardSnapshotInfo = shardSnapshotInfoOpt.get();
assertThat(shardSnapshotInfo.getSnapshot(), equalTo(repositorySnapshots.get(repository).snapshot()));
}
assertThat(shardSnapshotInfoOpt.isPresent(), equalTo(true));
ShardSnapshotInfo shardSnapshotInfo = shardSnapshotInfoOpt.get();
assertThat(shardSnapshotInfo.getSnapshot(), equalTo(expectedLatestSnapshot.snapshot()));
assertThat(shardSnapshotInfo.getRepository(), equalTo(expectedLatestSnapshot.repository()));
}

public void testFailedSnapshotsAreNotReturned() throws Exception {
Expand Down Expand Up @@ -306,12 +313,13 @@ public void testFailedSnapshotsAreNotReturned() throws Exception {
Optional<ShardSnapshotInfo> latestSnapshotForShard = getLatestSnapshotForShard(repoName, indexName, 0);
assertThat(latestSnapshotForShard.isPresent(), equalTo(true));
assertThat(latestSnapshotForShard.get().getSnapshot(), equalTo(snapshotInfo.snapshot()));
assertThat(latestSnapshotForShard.get().getRepository(), equalTo(snapshotInfo.repository()));
}

private Optional<ShardSnapshotInfo> getLatestSnapshotForShard(String repository, String indexName, int shard) {
final GetShardSnapshotResponse response = getLatestSnapshotForShardFuture(Collections.singletonList(repository), indexName, shard)
.actionGet();
return response.getIndexShardSnapshotInfoForRepository(repository);
return response.getLatestShardSnapshot();
}

private PlainActionFuture<GetShardSnapshotResponse> getLatestSnapshotForShardFuture(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.ShardSnapshotInfo;

Expand All @@ -20,38 +21,34 @@
import java.util.Optional;

public class GetShardSnapshotResponse extends ActionResponse {
public static GetShardSnapshotResponse EMPTY = new GetShardSnapshotResponse(Collections.emptyMap(), Collections.emptyMap());
public static GetShardSnapshotResponse EMPTY = new GetShardSnapshotResponse(null, Collections.emptyMap());

private final Map<String, ShardSnapshotInfo> repositoryShardSnapshots;
private final ShardSnapshotInfo latestShardSnapshot;
private final Map<String, RepositoryException> repositoryFailures;

GetShardSnapshotResponse(Map<String, ShardSnapshotInfo> repositoryShardSnapshots, Map<String, RepositoryException> repositoryFailures) {
this.repositoryShardSnapshots = repositoryShardSnapshots;
GetShardSnapshotResponse(@Nullable ShardSnapshotInfo latestShardSnapshot, Map<String, RepositoryException> repositoryFailures) {
this.latestShardSnapshot = latestShardSnapshot;
this.repositoryFailures = repositoryFailures;
}

GetShardSnapshotResponse(StreamInput in) throws IOException {
super(in);
this.repositoryShardSnapshots = in.readMap(StreamInput::readString, ShardSnapshotInfo::new);
this.latestShardSnapshot = in.readOptionalWriteable(ShardSnapshotInfo::new);
this.repositoryFailures = in.readMap(StreamInput::readString, RepositoryException::new);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeMap(repositoryShardSnapshots, StreamOutput::writeString, (o, info) -> info.writeTo(o));
out.writeOptionalWriteable(latestShardSnapshot);
out.writeMap(repositoryFailures, StreamOutput::writeString, (o, err) -> err.writeTo(o));
}

public Optional<ShardSnapshotInfo> getIndexShardSnapshotInfoForRepository(String repositoryName) {
return Optional.ofNullable(repositoryShardSnapshots.get(repositoryName));
}

public Optional<RepositoryException> getFailureForRepository(String repository) {
return Optional.ofNullable(repositoryFailures.get(repository));
}

public Map<String, ShardSnapshotInfo> getRepositoryShardSnapshots() {
return repositoryShardSnapshots;
public Optional<ShardSnapshotInfo> getLatestShardSnapshot() {
return Optional.ofNullable(latestShardSnapshot);
}

public Map<String, RepositoryException> getRepositoryFailures() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.transport.TransportService;

import java.util.Collection;
import java.util.Comparator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand All @@ -41,7 +42,8 @@
import java.util.stream.Collectors;

public class TransportGetShardSnapshotAction extends TransportMasterNodeAction<GetShardSnapshotRequest, GetShardSnapshotResponse> {

private static final Comparator<ShardSnapshotInfo> LATEST_SNAPSHOT_COMPARATOR = Comparator.comparing(ShardSnapshotInfo::getStartedAt)
.thenComparing(snapshotInfo -> snapshotInfo.getSnapshot().getSnapshotId());
private final IndexSnapshotsService indexSnapshotsService;

@Inject
Expand Down Expand Up @@ -125,19 +127,19 @@ private void getShardSnapshots(
private GetShardSnapshotResponse transformToResponse(
Collection<Tuple<Optional<ShardSnapshotInfo>, RepositoryException>> shardSnapshots
) {
final Map<String, ShardSnapshotInfo> repositoryShardSnapshot = shardSnapshots.stream()
final Optional<ShardSnapshotInfo> latestSnapshot = shardSnapshots.stream()
.map(Tuple::v1)
.filter(Objects::nonNull)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toMap(ShardSnapshotInfo::getRepository, Function.identity()));
.max(LATEST_SNAPSHOT_COMPARATOR);

final Map<String, RepositoryException> failures = shardSnapshots.stream()
.map(Tuple::v2)
.filter(Objects::nonNull)
.collect(Collectors.toMap(RepositoryException::repository, Function.identity()));

return new GetShardSnapshotResponse(repositoryShardSnapshot, failures);
return new GetShardSnapshotResponse(latestSnapshot.orElse(null), failures);
}

private Set<String> getRequestedRepositories(GetShardSnapshotRequest request, ClusterState state) {
Expand Down
Loading

0 comments on commit ca1f4dd

Please sign in to comment.