Skip to content

Commit

Permalink
nicer apis more docs
Browse files Browse the repository at this point in the history
  • Loading branch information
original-brownbear committed Sep 10, 2019
1 parent b6fc8c0 commit 3880103
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,11 @@

package org.elasticsearch.repositories;

import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.shard.ShardId;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -48,31 +43,6 @@ public ShardGenerations(Map<IndexId, List<String>> shardGenerations) {
this.shardGenerations = shardGenerations;
}

public static ShardGenerations fromSnapshot(SnapshotsInProgress.Entry snapshot) {
final Map<String, IndexId> indexLookup = new HashMap<>();
snapshot.indices().forEach(idx -> indexLookup.put(idx.getName(), idx));
final Map<IndexId, List<Tuple<ShardId, SnapshotsInProgress.ShardSnapshotStatus>>> res = new HashMap<>();
for (final ObjectObjectCursor<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shard : snapshot.shards()) {
final ShardId shardId = shard.key;
res.computeIfAbsent(indexLookup.get(shardId.getIndexName()), k -> new ArrayList<>()).add(new Tuple<>(shardId, shard.value));
}
return new ShardGenerations(res.entrySet().stream().collect(
Collectors.toMap(Map.Entry::getKey, entry -> {
final List<Tuple<ShardId, SnapshotsInProgress.ShardSnapshotStatus>> status = entry.getValue();
final String[] gens = new String[
status.stream().mapToInt(s -> s.v1().getId())
.max().orElseThrow(() -> new AssertionError("0-shard index is impossible")) + 1];
for (Tuple<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shard : status) {
if (shard.v2().state().failed() == false) {
final int id = shard.v1().getId();
assert gens[id] == null;
gens[id] = shard.v2().generation();
}
}
return Arrays.asList(gens);
})));
}

public List<IndexId> indices() {
return List.copyOf(shardGenerations.keySet());
}
Expand Down Expand Up @@ -183,4 +153,32 @@ public boolean equals(final Object o) {
public int hashCode() {
return Objects.hash(shardGenerations);
}

public static Builder builder() {
return new Builder();
}

public static final class Builder {

private final Map<IndexId, Map<Integer, String>> raw = new HashMap<>();

public Builder add(IndexId indexId, int shardId, String generation) {
raw.computeIfAbsent(indexId, i -> new HashMap<>()).put(shardId, generation);
return this;
}

public ShardGenerations build() {
return new ShardGenerations(raw.entrySet().stream().collect(Collectors.toMap(
Map.Entry::getKey,
entry -> {
final int size = entry.getValue().keySet().stream().mapToInt(i -> i).max().orElse(-1) + 1;
final String[] gens = new String[size];
entry.getValue().forEach((shardId, generation) -> {
gens[shardId] = generation;
});
return Arrays.asList(gens);
}
)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -401,6 +400,12 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Versio
/**
* Delete a snapshot from the repository in a cluster that does contain one ore more nodes older than
* {@link SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION}.
* The order of operations executed by this method differs the non-BwC delete {@link #doDeleteShardSnapshots} as follows:
* Instead of collecting the new shard generations for each shard touched by the delete and storing them in an updated
* {@link RepositoryData} this method writes and updated {@link RepositoryData} with the deleted snapshot removed as the first step.
* After updating the {@link RepositoryData} each of the shards directories is individually first moved to the next shard generation
* and then has all now unreferenced blobs in it deleted.
*
* @param snapshotId SnapshotId to delete
* @param repositoryStateId Expected repository state id
* @param version Node version that must be able to read this repositories contents after the delete has finished
Expand Down Expand Up @@ -467,7 +472,7 @@ private void doDeleteShardSnapshots(SnapshotId snapshotId, long repositoryStateI
assert version.onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION);

// listener to complete once all shards folders affected by this delete have been added new metadata blobs without this snapshot
final StepListener<List<ShardSnapshotMetaDeleteResult>> deleteFromMetaListener = new StepListener<>();
final StepListener<Collection<ShardSnapshotMetaDeleteResult>> deleteFromMetaListener = new StepListener<>();

final List<IndexId> indices = repositoryData.indicesAfterRemovingSnapshot(snapshotId);
if (indices.isEmpty()) {
Expand All @@ -476,8 +481,8 @@ private void doDeleteShardSnapshots(SnapshotId snapshotId, long repositoryStateI
} else {

// Listener that flattens out the delete results for each index
final ActionListener<List<ShardSnapshotMetaDeleteResult>> deleteIndexMetaDataListener = new GroupedActionListener<>(
ActionListener.map(deleteFromMetaListener, idxs -> idxs.stream().flatMap(List::stream).collect(Collectors.toList())),
final ActionListener<Collection<ShardSnapshotMetaDeleteResult>> deleteIndexMetaDataListener = new GroupedActionListener<>(
ActionListener.map(deleteFromMetaListener, idxs -> idxs.stream().flatMap(Collection::stream).collect(Collectors.toList())),
indices.size());

for (IndexId indexId : indices) {
Expand All @@ -498,13 +503,8 @@ private void doDeleteShardSnapshots(SnapshotId snapshotId, long repositoryStateI
assert shardCount > 0;

// Listener for collecting the results of removing the snapshot from each shard's metadata in the current index
final ActionListener<ShardSnapshotMetaDeleteResult> allShardsListener = new GroupedActionListener<>(
ActionListener.map(deleteIdxMetaListener, shardGenerations -> {
assert shardGenerations.size() == shardGenerations.stream().mapToInt(s -> s.shardId).max()
.orElseThrow(() -> new AssertionError("Empty shard gen array")) + 1
: "Highest shard id was larger than the number of shard generation updates received.";
return List.copyOf(shardGenerations);
}), shardCount);
final ActionListener<ShardSnapshotMetaDeleteResult> allShardsListener =
new GroupedActionListener<>(deleteIdxMetaListener, shardCount);

for (int shardId = 0; shardId < shardCount; shardId++) {
deleteShardSnapshotFromMeta(repositoryData, indexId, new ShardId(indexMetaData.getIndex(), shardId),
Expand All @@ -528,18 +528,11 @@ private void doDeleteShardSnapshots(SnapshotId snapshotId, long repositoryStateI
// will not be referenced by the existing RepositoryData and new RepositoryData is only written if all shard paths have been
// successfully updated.
deleteFromMetaListener.whenComplete(newGens -> {
final Map<IndexId, List<ShardSnapshotMetaDeleteResult>> updatedShardGenerations = new HashMap<>();
final ShardGenerations.Builder builder = ShardGenerations.builder();
for (ShardSnapshotMetaDeleteResult newGen : newGens) {
updatedShardGenerations.computeIfAbsent(newGen.indexId, i -> new ArrayList<>()).add(newGen);
builder.add(newGen.indexId, newGen.shardId, newGen.newGeneration);
}
assert assertShardUpdatesCorrectlyOrdered(updatedShardGenerations.values());
final RepositoryData newRepoData = repositoryData.removeSnapshot(snapshotId, new ShardGenerations(
updatedShardGenerations.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> entry.getValue().stream()
.map(shardSnapshotMetaDeleteResult -> shardSnapshotMetaDeleteResult.newGeneration)
.collect(Collectors.toList())))));
final RepositoryData newRepoData = repositoryData.removeSnapshot(snapshotId, builder.build());

// Write out new RepositoryData
writeIndexGen(newRepoData, repositoryStateId, version);
Expand All @@ -560,18 +553,6 @@ private void doDeleteShardSnapshots(SnapshotId snapshotId, long repositoryStateI
}, listener::onFailure);
}

// Make sure that each of the given lists is only contains a single index and is ordered by shard id such that the shard id is equal
// to the position in the list.
private static boolean assertShardUpdatesCorrectlyOrdered(Collection<List<ShardSnapshotMetaDeleteResult>> updates) {
updates.forEach(chunk -> {
for (int i = 0; i < chunk.size(); i++) {
assert chunk.get(i).shardId == i;
}
assert chunk.stream().map(s -> s.indexId).distinct().count() == 1L;
});
return true;
}

private void cleanupStaleBlobs(Map<String, BlobContainer> foundIndices, Set<String> rootBlobs, RepositoryData newRepoData,
Collection<IndexId> indices, ActionListener<Void> listener) {
final GroupedActionListener<Void> afterCleanup = new GroupedActionListener<>(ActionListener.map(listener, v -> null), 2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,14 @@ public SnapshotsService(Settings settings, ClusterService clusterService, IndexN
}
}

private static ShardGenerations buildGenerations(SnapshotsInProgress.Entry snapshot) {
ShardGenerations.Builder builder = ShardGenerations.builder();
final Map<String, IndexId> indexLookup = new HashMap<>();
snapshot.indices().forEach(idx -> indexLookup.put(idx.getName(), idx));
snapshot.shards().forEach(c -> builder.add(indexLookup.get(c.key.getIndexName()), c.key.id(), c.value.generation()));
return builder.build();
}

/**
* Gets the {@link RepositoryData} for the given repository.
*
Expand Down Expand Up @@ -568,7 +576,7 @@ private void cleanupAfterError(Exception exception) {
try {
repositoriesService.repository(snapshot.snapshot().getRepository())
.finalizeSnapshot(snapshot.snapshot().getSnapshotId(),
ShardGenerations.fromSnapshot(snapshot),
buildGenerations(snapshot),
snapshot.startTime(),
ExceptionsHelper.detailedMessage(exception),
0,
Expand Down Expand Up @@ -1011,7 +1019,7 @@ protected void doRun() {
List<SnapshotShardFailure> shardFailures = extractFailure(entry.shards());
SnapshotInfo snapshotInfo = repository.finalizeSnapshot(
snapshot.getSnapshotId(),
ShardGenerations.fromSnapshot(entry),
buildGenerations(entry),
entry.startTime(),
failure,
entry.shards().size(),
Expand Down

0 comments on commit 3880103

Please sign in to comment.