Skip to content

Commit

Permalink
nicer
Browse files Browse the repository at this point in the history
  • Loading branch information
original-brownbear committed Sep 10, 2019
1 parent 47fab04 commit b6fc8c0
Show file tree
Hide file tree
Showing 16 changed files with 98 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -315,21 +315,22 @@ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ShardSnapshotStatus status = (ShardSnapshotStatus) o;
return Objects.equals(nodeId, status.nodeId) && Objects.equals(reason, status.reason) && state == status.state;

return Objects.equals(nodeId, status.nodeId) && Objects.equals(reason, status.reason)
&& Objects.equals(generation, status.generation) && state == status.state;
}

@Override
public int hashCode() {
int result = state != null ? state.hashCode() : 0;
result = 31 * result + (nodeId != null ? nodeId.hashCode() : 0);
result = 31 * result + (reason != null ? reason.hashCode() : 0);
result = 31 * result + (generation != null ? generation.hashCode() : 0);
return result;
}

@Override
public String toString() {
return "ShardSnapshotStatus[state=" + state + ", nodeId=" + nodeId + ", reason=" + reason + "]";
return "ShardSnapshotStatus[state=" + state + ", nodeId=" + nodeId + ", reason=" + reason + ", generation=" + generation + "]";
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, Met
}

@Override
public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, Map<IndexId, String[]> shardGenerations, long startTime, String failure,
public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure,
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
boolean includeGlobalState, MetaData metaData, Map<String, Object> userMetadata,
Version version) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ default Repository create(RepositoryMetaData metaData, Function<String, Reposito
* This method is called on master after all shards are snapshotted.
*
* @param snapshotId snapshot id
* @param shardGenerations map of indices in snapshot to snapshot shard generation
* @param shardGenerations updated shard generations
* @param startTime start time of the snapshot
* @param failure global failure reason or null
* @param totalShards total number of shards
Expand All @@ -139,7 +139,7 @@ default Repository create(RepositoryMetaData metaData, Function<String, Reposito
* @param version minimum ES version the repository should be readable by
* @return snapshot description
*/
SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, Map<IndexId, String[]> shardGenerations, long startTime, String failure,
SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure,
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
boolean includeGlobalState, MetaData clusterMetaData, Map<String, Object> userMetadata,
Version version);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public List<IndexId> indicesAfterRemovingSnapshot(SnapshotId snapshotId) {
*/
public RepositoryData addSnapshot(final SnapshotId snapshotId,
final SnapshotState snapshotState,
final Map<IndexId, String[]> shardGenerations) {
final ShardGenerations shardGenerations) {
if (snapshotIds.containsKey(snapshotId.getUUID())) {
// if the snapshot id already exists in the repository data, it means an old master
// that is blocked from the cluster is trying to finalize a snapshot concurrently with
Expand All @@ -198,7 +198,7 @@ public RepositoryData addSnapshot(final SnapshotId snapshotId,
Map<String, SnapshotState> newSnapshotStates = new HashMap<>(snapshotStates);
newSnapshotStates.put(snapshotId.getUUID(), snapshotState);
Map<IndexId, Set<SnapshotId>> allIndexSnapshots = new HashMap<>(indexSnapshots);
for (final IndexId indexId : shardGenerations.keySet()) {
for (final IndexId indexId : shardGenerations.indices()) {
allIndexSnapshots.computeIfAbsent(indexId, k -> new LinkedHashSet<>()).add(snapshotId);
}
return new RepositoryData(genId, snapshots, newSnapshotStates, allIndexSnapshots,
Expand Down Expand Up @@ -228,7 +228,7 @@ public RepositoryData withGenId(long newGeneration) {
* Pass {@code null} when this instance should not track shard generations while the cluster still
* contains nodes from before {@link SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION}.
*/
public RepositoryData removeSnapshot(final SnapshotId snapshotId, @Nullable final Map<IndexId, String[]> updatedShardGenerations) {
public RepositoryData removeSnapshot(final SnapshotId snapshotId, @Nullable final ShardGenerations updatedShardGenerations) {
Map<String, SnapshotId> newSnapshotIds = snapshotIds.values().stream()
.filter(id -> !snapshotId.equals(id))
.collect(Collectors.toMap(SnapshotId::getUUID, Function.identity()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,64 @@

package org.elasticsearch.repositories;

import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.shard.ShardId;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

final class ShardGenerations implements ToXContent {
public final class ShardGenerations implements ToXContent {

public static final ShardGenerations EMPTY = new ShardGenerations(Collections.emptyMap());

private final Map<IndexId, List<String>> shardGenerations;

ShardGenerations(Map<IndexId, List<String>> shardGenerations) {
public ShardGenerations(Map<IndexId, List<String>> shardGenerations) {
this.shardGenerations = shardGenerations;
}

public static ShardGenerations fromSnapshot(SnapshotsInProgress.Entry snapshot) {
final Map<String, IndexId> indexLookup = new HashMap<>();
snapshot.indices().forEach(idx -> indexLookup.put(idx.getName(), idx));
final Map<IndexId, List<Tuple<ShardId, SnapshotsInProgress.ShardSnapshotStatus>>> res = new HashMap<>();
for (final ObjectObjectCursor<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shard : snapshot.shards()) {
final ShardId shardId = shard.key;
res.computeIfAbsent(indexLookup.get(shardId.getIndexName()), k -> new ArrayList<>()).add(new Tuple<>(shardId, shard.value));
}
return new ShardGenerations(res.entrySet().stream().collect(
Collectors.toMap(Map.Entry::getKey, entry -> {
final List<Tuple<ShardId, SnapshotsInProgress.ShardSnapshotStatus>> status = entry.getValue();
final String[] gens = new String[
status.stream().mapToInt(s -> s.v1().getId())
.max().orElseThrow(() -> new AssertionError("0-shard index is impossible")) + 1];
for (Tuple<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shard : status) {
if (shard.v2().state().failed() == false) {
final int id = shard.v1().getId();
assert gens[id] == null;
gens[id] = shard.v2().generation();
}
}
return Arrays.asList(gens);
})));
}

public List<IndexId> indices() {
return List.copyOf(shardGenerations.keySet());
}

/**
* Computes the obsolete shard index generations that can be deleted if this instance was written to the repository.
*
Expand Down Expand Up @@ -91,14 +126,14 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder;
}

ShardGenerations updatedGenerations(final Map<IndexId, String[]> shardGenerations) {
ShardGenerations updatedGenerations(ShardGenerations updates) {
final Map<IndexId, List<String>> updatedGenerations = new HashMap<>(this.shardGenerations);
shardGenerations.forEach(((indexId, updatedGens) -> {
final List<String> existing = updatedGenerations.put(indexId, Arrays.asList(updatedGens));
updates.shardGenerations.forEach(((indexId, updatedGens) -> {
final List<String> existing = updatedGenerations.put(indexId, updatedGens);
if (existing != null) {
for (int i = 0; i < updatedGens.length; ++i) {
if (updatedGens[i] == null) {
updatedGens[i] = existing.get(i);
for (int i = 0; i < updatedGens.size(); ++i) {
if (updatedGens.get(i) == null) {
updatedGens.set(i, existing.get(i));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryVerificationException;
import org.elasticsearch.repositories.ShardGenerations;
import org.elasticsearch.snapshots.SnapshotCreationException;
import org.elasticsearch.snapshots.SnapshotException;
import org.elasticsearch.snapshots.SnapshotId;
Expand Down Expand Up @@ -532,12 +533,13 @@ private void doDeleteShardSnapshots(SnapshotId snapshotId, long repositoryStateI
updatedShardGenerations.computeIfAbsent(newGen.indexId, i -> new ArrayList<>()).add(newGen);
}
assert assertShardUpdatesCorrectlyOrdered(updatedShardGenerations.values());
final RepositoryData newRepoData = repositoryData.removeSnapshot(snapshotId, updatedShardGenerations.entrySet().stream()
final RepositoryData newRepoData = repositoryData.removeSnapshot(snapshotId, new ShardGenerations(
updatedShardGenerations.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> entry.getValue().stream()
.map(shardSnapshotMetaDeleteResult -> shardSnapshotMetaDeleteResult.newGeneration)
.toArray(String[]::new))));
.collect(Collectors.toList())))));

// Write out new RepositoryData
writeIndexGen(newRepoData, repositoryStateId, version);
Expand Down Expand Up @@ -762,7 +764,7 @@ private void deleteIndexMetaDataBlobIgnoringErrors(SnapshotId snapshotId, IndexI

@Override
public SnapshotInfo finalizeSnapshot(final SnapshotId snapshotId,
final Map<IndexId, String[]> shardGenerations,
final ShardGenerations shardGenerations,
final long startTime,
final String failure,
final int totalShards,
Expand All @@ -772,8 +774,9 @@ public SnapshotInfo finalizeSnapshot(final SnapshotId snapshotId,
final MetaData clusterMetaData,
final Map<String, Object> userMetadata,
final Version version) {
final List<IndexId> indices = shardGenerations.indices();
SnapshotInfo blobStoreSnapshot = new SnapshotInfo(snapshotId,
shardGenerations.keySet().stream().map(IndexId::getName).collect(Collectors.toList()),
indices.stream().map(IndexId::getName).collect(Collectors.toList()),
startTime, failure, threadPool.absoluteTimeInMillis(), totalShards, shardFailures,
includeGlobalState, userMetadata);

Expand All @@ -788,7 +791,7 @@ public SnapshotInfo finalizeSnapshot(final SnapshotId snapshotId,
globalMetaDataFormat.write(clusterMetaData, blobContainer(), snapshotId.getUUID(), false);

// write the index metadata for each index in the snapshot
for (IndexId index : shardGenerations.keySet()) {
for (IndexId index : indices) {
indexMetaDataFormat.write(clusterMetaData.index(index.getName()), indexContainer(index), snapshotId.getUUID(), false);
}
} catch (IOException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryMissingException;
import org.elasticsearch.repositories.ShardGenerations;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
Expand Down Expand Up @@ -567,7 +568,7 @@ private void cleanupAfterError(Exception exception) {
try {
repositoriesService.repository(snapshot.snapshot().getRepository())
.finalizeSnapshot(snapshot.snapshot().getSnapshotId(),
shardGenerations(snapshot),
ShardGenerations.fromSnapshot(snapshot),
snapshot.startTime(),
ExceptionsHelper.detailedMessage(exception),
0,
Expand All @@ -588,28 +589,6 @@ private void cleanupAfterError(Exception exception) {
}
}

private static Map<IndexId, String[]> shardGenerations(SnapshotsInProgress.Entry snapshot) {
final Map<String, IndexId> indexLookup = new HashMap<>();
snapshot.indices().forEach(idx -> indexLookup.put(idx.getName(), idx));
final Map<IndexId, List<Tuple<ShardId, ShardSnapshotStatus>>> res = new HashMap<>();
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shard : snapshot.shards()) {
final IndexId indexId = indexLookup.get(shard.key.getIndexName());
assert indexId != null;
res.computeIfAbsent(indexId, k -> new ArrayList<>()).add(new Tuple<>(shard.key, shard.value));
}
return res.entrySet().stream().collect(
Collectors.toMap(Map.Entry::getKey, entry -> {
final String[] gens = new String[entry.getValue().stream().mapToInt(s -> s.v1().getId()).max().orElse(0) + 1];
for (Tuple<ShardId, ShardSnapshotStatus> shard : entry.getValue()) {
if (shard.v2().state().failed() == false) {
assert gens[shard.v1().getId()] == null;
gens[shard.v1().getId()] = shard.v2().generation();
}
}
return gens;
}));
}

private static MetaData metaDataForSnapshot(SnapshotsInProgress.Entry snapshot, MetaData metaData) {
if (snapshot.includeGlobalState() == false) {
// Remove global state from the cluster state
Expand Down Expand Up @@ -1032,7 +1011,7 @@ protected void doRun() {
List<SnapshotShardFailure> shardFailures = extractFailure(entry.shards());
SnapshotInfo snapshotInfo = repository.finalizeSnapshot(
snapshot.getSnapshotId(),
shardGenerations(entry),
ShardGenerations.fromSnapshot(entry),
entry.startTime(),
failure,
entry.shards().size(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, Met
}

@Override
public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, Map<IndexId, String[]> indices, long startTime, String failure,
public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, ShardGenerations indices, long startTime, String failure,
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
boolean includeGlobalState, MetaData metaData, Map<String, Object> userMetadata,
Version version) {
Expand Down
Loading

0 comments on commit b6fc8c0

Please sign in to comment.