From 38801031616ddef9ff9fd909dc273a1235268797 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 10 Sep 2019 10:32:04 +0200 Subject: [PATCH] nicer apis more docs --- .../repositories/ShardGenerations.java | 58 +++++++++---------- .../blobstore/BlobStoreRepository.java | 47 +++++---------- .../snapshots/SnapshotsService.java | 12 +++- 3 files changed, 52 insertions(+), 65 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/ShardGenerations.java b/server/src/main/java/org/elasticsearch/repositories/ShardGenerations.java index e88fcf46008e2..98ba8c50ac317 100644 --- a/server/src/main/java/org/elasticsearch/repositories/ShardGenerations.java +++ b/server/src/main/java/org/elasticsearch/repositories/ShardGenerations.java @@ -19,16 +19,11 @@ package org.elasticsearch.repositories; -import com.carrotsearch.hppc.cursors.ObjectObjectCursor; -import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.index.shard.ShardId; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -48,31 +43,6 @@ public ShardGenerations(Map> shardGenerations) { this.shardGenerations = shardGenerations; } - public static ShardGenerations fromSnapshot(SnapshotsInProgress.Entry snapshot) { - final Map indexLookup = new HashMap<>(); - snapshot.indices().forEach(idx -> indexLookup.put(idx.getName(), idx)); - final Map>> res = new HashMap<>(); - for (final ObjectObjectCursor shard : snapshot.shards()) { - final ShardId shardId = shard.key; - res.computeIfAbsent(indexLookup.get(shardId.getIndexName()), k -> new ArrayList<>()).add(new Tuple<>(shardId, shard.value)); - } - return new ShardGenerations(res.entrySet().stream().collect( - Collectors.toMap(Map.Entry::getKey, entry -> { - final List> status = entry.getValue(); - final String[] gens = new String[ - status.stream().mapToInt(s -> s.v1().getId()) - .max().orElseThrow(() -> new AssertionError("0-shard index is impossible")) + 1]; - for (Tuple shard : status) { - if (shard.v2().state().failed() == false) { - final int id = shard.v1().getId(); - assert gens[id] == null; - gens[id] = shard.v2().generation(); - } - } - return Arrays.asList(gens); - }))); - } - public List indices() { return List.copyOf(shardGenerations.keySet()); } @@ -183,4 +153,32 @@ public boolean equals(final Object o) { public int hashCode() { return Objects.hash(shardGenerations); } + + public static Builder builder() { + return new Builder(); + } + + public static final class Builder { + + private final Map> raw = new HashMap<>(); + + public Builder add(IndexId indexId, int shardId, String generation) { + raw.computeIfAbsent(indexId, i -> new HashMap<>()).put(shardId, generation); + return this; + } + + public ShardGenerations build() { + return new ShardGenerations(raw.entrySet().stream().collect(Collectors.toMap( + Map.Entry::getKey, + entry -> { + final int size = entry.getValue().keySet().stream().mapToInt(i -> i).max().orElse(-1) + 1; + final String[] gens = new String[size]; + entry.getValue().forEach((shardId, generation) -> { + gens[shardId] = generation; + }); + return Arrays.asList(gens); + } + ))); + } + } } diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 5a309ab2ffc7f..a4c6950f3c143 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -107,7 +107,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -401,6 +400,12 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Versio /** * Delete a snapshot from the repository in a cluster that does contain one ore more nodes older than * {@link SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION}. + * The order of operations executed by this method differs the non-BwC delete {@link #doDeleteShardSnapshots} as follows: + * Instead of collecting the new shard generations for each shard touched by the delete and storing them in an updated + * {@link RepositoryData} this method writes and updated {@link RepositoryData} with the deleted snapshot removed as the first step. + * After updating the {@link RepositoryData} each of the shards directories is individually first moved to the next shard generation + * and then has all now unreferenced blobs in it deleted. + * * @param snapshotId SnapshotId to delete * @param repositoryStateId Expected repository state id * @param version Node version that must be able to read this repositories contents after the delete has finished @@ -467,7 +472,7 @@ private void doDeleteShardSnapshots(SnapshotId snapshotId, long repositoryStateI assert version.onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION); // listener to complete once all shards folders affected by this delete have been added new metadata blobs without this snapshot - final StepListener> deleteFromMetaListener = new StepListener<>(); + final StepListener> deleteFromMetaListener = new StepListener<>(); final List indices = repositoryData.indicesAfterRemovingSnapshot(snapshotId); if (indices.isEmpty()) { @@ -476,8 +481,8 @@ private void doDeleteShardSnapshots(SnapshotId snapshotId, long repositoryStateI } else { // Listener that flattens out the delete results for each index - final ActionListener> deleteIndexMetaDataListener = new GroupedActionListener<>( - ActionListener.map(deleteFromMetaListener, idxs -> idxs.stream().flatMap(List::stream).collect(Collectors.toList())), + final ActionListener> deleteIndexMetaDataListener = new GroupedActionListener<>( + ActionListener.map(deleteFromMetaListener, idxs -> idxs.stream().flatMap(Collection::stream).collect(Collectors.toList())), indices.size()); for (IndexId indexId : indices) { @@ -498,13 +503,8 @@ private void doDeleteShardSnapshots(SnapshotId snapshotId, long repositoryStateI assert shardCount > 0; // Listener for collecting the results of removing the snapshot from each shard's metadata in the current index - final ActionListener allShardsListener = new GroupedActionListener<>( - ActionListener.map(deleteIdxMetaListener, shardGenerations -> { - assert shardGenerations.size() == shardGenerations.stream().mapToInt(s -> s.shardId).max() - .orElseThrow(() -> new AssertionError("Empty shard gen array")) + 1 - : "Highest shard id was larger than the number of shard generation updates received."; - return List.copyOf(shardGenerations); - }), shardCount); + final ActionListener allShardsListener = + new GroupedActionListener<>(deleteIdxMetaListener, shardCount); for (int shardId = 0; shardId < shardCount; shardId++) { deleteShardSnapshotFromMeta(repositoryData, indexId, new ShardId(indexMetaData.getIndex(), shardId), @@ -528,18 +528,11 @@ private void doDeleteShardSnapshots(SnapshotId snapshotId, long repositoryStateI // will not be referenced by the existing RepositoryData and new RepositoryData is only written if all shard paths have been // successfully updated. deleteFromMetaListener.whenComplete(newGens -> { - final Map> updatedShardGenerations = new HashMap<>(); + final ShardGenerations.Builder builder = ShardGenerations.builder(); for (ShardSnapshotMetaDeleteResult newGen : newGens) { - updatedShardGenerations.computeIfAbsent(newGen.indexId, i -> new ArrayList<>()).add(newGen); + builder.add(newGen.indexId, newGen.shardId, newGen.newGeneration); } - assert assertShardUpdatesCorrectlyOrdered(updatedShardGenerations.values()); - final RepositoryData newRepoData = repositoryData.removeSnapshot(snapshotId, new ShardGenerations( - updatedShardGenerations.entrySet().stream() - .collect(Collectors.toMap( - Map.Entry::getKey, - entry -> entry.getValue().stream() - .map(shardSnapshotMetaDeleteResult -> shardSnapshotMetaDeleteResult.newGeneration) - .collect(Collectors.toList()))))); + final RepositoryData newRepoData = repositoryData.removeSnapshot(snapshotId, builder.build()); // Write out new RepositoryData writeIndexGen(newRepoData, repositoryStateId, version); @@ -560,18 +553,6 @@ private void doDeleteShardSnapshots(SnapshotId snapshotId, long repositoryStateI }, listener::onFailure); } - // Make sure that each of the given lists is only contains a single index and is ordered by shard id such that the shard id is equal - // to the position in the list. - private static boolean assertShardUpdatesCorrectlyOrdered(Collection> updates) { - updates.forEach(chunk -> { - for (int i = 0; i < chunk.size(); i++) { - assert chunk.get(i).shardId == i; - } - assert chunk.stream().map(s -> s.indexId).distinct().count() == 1L; - }); - return true; - } - private void cleanupStaleBlobs(Map foundIndices, Set rootBlobs, RepositoryData newRepoData, Collection indices, ActionListener listener) { final GroupedActionListener afterCleanup = new GroupedActionListener<>(ActionListener.map(listener, v -> null), 2); diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index c76a980aa8510..c9b530fffa958 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -157,6 +157,14 @@ public SnapshotsService(Settings settings, ClusterService clusterService, IndexN } } + private static ShardGenerations buildGenerations(SnapshotsInProgress.Entry snapshot) { + ShardGenerations.Builder builder = ShardGenerations.builder(); + final Map indexLookup = new HashMap<>(); + snapshot.indices().forEach(idx -> indexLookup.put(idx.getName(), idx)); + snapshot.shards().forEach(c -> builder.add(indexLookup.get(c.key.getIndexName()), c.key.id(), c.value.generation())); + return builder.build(); + } + /** * Gets the {@link RepositoryData} for the given repository. * @@ -568,7 +576,7 @@ private void cleanupAfterError(Exception exception) { try { repositoriesService.repository(snapshot.snapshot().getRepository()) .finalizeSnapshot(snapshot.snapshot().getSnapshotId(), - ShardGenerations.fromSnapshot(snapshot), + buildGenerations(snapshot), snapshot.startTime(), ExceptionsHelper.detailedMessage(exception), 0, @@ -1011,7 +1019,7 @@ protected void doRun() { List shardFailures = extractFailure(entry.shards()); SnapshotInfo snapshotInfo = repository.finalizeSnapshot( snapshot.getSnapshotId(), - ShardGenerations.fromSnapshot(entry), + buildGenerations(entry), entry.startTime(), failure, entry.shards().size(),