diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java index 4e51334360080..738b261d7fd8e 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java @@ -42,6 +42,7 @@ import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryCleanupResult; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.snapshots.SnapshotsService; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -201,7 +202,9 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS logger.debug("Initialized repository cleanup in cluster state for [{}][{}]", repositoryName, repositoryStateId); threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> blobStoreRepository.cleanup( - repositoryStateId, ActionListener.wrap(result -> after(null, result), e -> after(e, null))))); + repositoryStateId, + newState.nodes().getMinNodeVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION), + ActionListener.wrap(result -> after(null, result), e -> after(e, null))))); } private void after(@Nullable Exception failure, @Nullable RepositoryCleanupResult result) { diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index 92a04dd03434a..b40d771c64739 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -91,12 +91,14 @@ public static class Entry implements ToXContent { private final ImmutableOpenMap> waitingIndices; private final long startTime; private final long repositoryStateId; + // see #useShardGenerations + private final boolean useShardGenerations; @Nullable private final Map userMetadata; @Nullable private final String failure; public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List indices, long startTime, long repositoryStateId, ImmutableOpenMap shards, - String failure, Map userMetadata) { + String failure, Map userMetadata, boolean useShardGenerations) { this.state = state; this.snapshot = snapshot; this.includeGlobalState = includeGlobalState; @@ -114,6 +116,7 @@ public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, Sta this.repositoryStateId = repositoryStateId; this.failure = failure; this.userMetadata = userMetadata; + this.useShardGenerations = useShardGenerations; } private static boolean assertShardsConsistent(State state, List indices, @@ -128,20 +131,22 @@ private static boolean assertShardsConsistent(State state, List indices : "Indices in shards " + indexNamesInShards + " differ from expected indices " + indexNames + " for state [" + state + "]"; return true; } + public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List indices, long startTime, long repositoryStateId, ImmutableOpenMap shards, - Map userMetadata) { - this(snapshot, includeGlobalState, partial, state, indices, startTime, repositoryStateId, shards, null, userMetadata); + Map userMetadata, boolean useShardGenerations) { + this(snapshot, includeGlobalState, partial, state, indices, startTime, repositoryStateId, shards, null, userMetadata, + useShardGenerations); } public Entry(Entry entry, State state, ImmutableOpenMap shards) { this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime, - entry.repositoryStateId, shards, entry.failure, entry.userMetadata); + entry.repositoryStateId, shards, entry.failure, entry.userMetadata, entry.useShardGenerations); } public Entry(Entry entry, State state, ImmutableOpenMap shards, String failure) { this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime, - entry.repositoryStateId, shards, failure, entry.userMetadata); + entry.repositoryStateId, shards, failure, entry.userMetadata, entry.useShardGenerations); } public Entry(Entry entry, ImmutableOpenMap shards) { @@ -192,6 +197,16 @@ public String failure() { return failure; } + /** + * Whether to write to the repository in a format only understood by versions newer than + * {@link SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION}. + * + * @return true if writing to repository in new format + */ + public boolean useShardGenerations() { + return useShardGenerations; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -207,6 +222,7 @@ public boolean equals(Object o) { if (!snapshot.equals(entry.snapshot)) return false; if (state != entry.state) return false; if (repositoryStateId != entry.repositoryStateId) return false; + if (useShardGenerations != entry.useShardGenerations) return false; return true; } @@ -221,6 +237,7 @@ public int hashCode() { result = 31 * result + indices.hashCode(); result = 31 * result + Long.hashCode(startTime); result = 31 * result + Long.hashCode(repositoryStateId); + result = 31 * result + (useShardGenerations ? 1 : 0); return result; } @@ -503,6 +520,12 @@ public SnapshotsInProgress(StreamInput in) throws IOException { if (in.getVersion().onOrAfter(METADATA_FIELD_INTRODUCED)) { userMetadata = in.readMap(); } + final boolean useShardGenerations; + if (in.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) { + useShardGenerations = in.readBoolean(); + } else { + useShardGenerations = false; + } entries[i] = new Entry(snapshot, includeGlobalState, partial, @@ -512,7 +535,8 @@ public SnapshotsInProgress(StreamInput in) throws IOException { repositoryStateId, builder.build(), failure, - userMetadata + userMetadata, + useShardGenerations ); } this.entries = Arrays.asList(entries); @@ -541,6 +565,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(METADATA_FIELD_INTRODUCED)) { out.writeMap(entry.userMetadata); } + if (out.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) { + out.writeBoolean(entry.useShardGenerations); + } } } diff --git a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java index 1379ad74c95ce..59092cc573bf2 100644 --- a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java @@ -73,16 +73,17 @@ public RepositoryData getRepositoryData() { } @Override - public void finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, - List shardFailures, long repositoryStateId, boolean includeGlobalState, - MetaData metaData, Map userMetadata, ActionListener listener) { - in.finalizeSnapshot(snapshotId, indices, startTime, failure, totalShards, shardFailures, repositoryStateId, - includeGlobalState, metaData, userMetadata, listener); + public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, + int totalShards, List shardFailures, long repositoryStateId, + boolean includeGlobalState, MetaData metaData, Map userMetadata, + boolean writeShardGens, ActionListener listener) { + in.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures, repositoryStateId, + includeGlobalState, metaData, userMetadata, writeShardGens, listener); } @Override - public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener listener) { - in.deleteSnapshot(snapshotId, repositoryStateId, listener); + public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener listener) { + in.deleteSnapshot(snapshotId, repositoryStateId, writeShardGens, listener); } @Override @@ -117,8 +118,9 @@ public boolean isReadOnly() { @Override public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, - IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener listener) { - in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, listener); + IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens, + ActionListener listener) { + in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, writeShardGens, listener); } @Override public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) { diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java index 53142920dc835..31687fbe0f8ec 100644 --- a/server/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java @@ -112,28 +112,33 @@ default Repository create(RepositoryMetaData metaData, Function * This method is called on master after all shards are snapshotted. * - * @param snapshotId snapshot id - * @param indices list of indices in the snapshot - * @param startTime start time of the snapshot - * @param failure global failure reason or null - * @param totalShards total number of shards - * @param shardFailures list of shard failures - * @param repositoryStateId the unique id identifying the state of the repository when the snapshot began + * @param snapshotId snapshot id + * @param shardGenerations updated shard generations + * @param startTime start time of the snapshot + * @param failure global failure reason or null + * @param totalShards total number of shards + * @param shardFailures list of shard failures + * @param repositoryStateId the unique id identifying the state of the repository when the snapshot began * @param includeGlobalState include cluster global state + * @param clusterMetaData cluster metadata + * @param userMetadata user metadata + * @param writeShardGens if shard generations should be written to the repository * @param listener listener to be called on completion of the snapshot */ - void finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, - List shardFailures, long repositoryStateId, boolean includeGlobalState, - MetaData clusterMetaData, Map userMetadata, ActionListener listener); + void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, + int totalShards, List shardFailures, long repositoryStateId, + boolean includeGlobalState, MetaData clusterMetaData, Map userMetadata, + boolean writeShardGens, ActionListener listener); /** * Deletes snapshot * - * @param snapshotId snapshot id + * @param snapshotId snapshot id * @param repositoryStateId the unique id identifying the state of the repository when the snapshot deletion began - * @param listener completion listener + * @param writeShardGens if shard generations should be written to the repository + * @param listener completion listener */ - void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener listener); + void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener listener); /** * Returns snapshot throttle time in nanoseconds @@ -195,7 +200,7 @@ void finalizeSnapshot(SnapshotId snapshotId, List indices, long startTi * @param listener listener invoked on completion */ void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, - IndexShardSnapshotStatus snapshotStatus, ActionListener listener); + IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens, ActionListener listener); /** * Restores snapshot of the shard. diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java b/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java index edcb9b9ff54c3..8589d2efdfc6d 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentParserUtils; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotState; @@ -55,7 +56,7 @@ public final class RepositoryData { * An instance initialized for an empty repository. */ public static final RepositoryData EMPTY = new RepositoryData(EMPTY_REPO_GEN, - Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()); + Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), ShardGenerations.EMPTY); /** * The generational id of the index file from which the repository data was read. @@ -78,19 +79,30 @@ public final class RepositoryData { */ private final Map> indexSnapshots; + /** + * Shard generations. + */ + private final ShardGenerations shardGenerations; public RepositoryData(long genId, Map snapshotIds, Map snapshotStates, - Map> indexSnapshots) { + Map> indexSnapshots, ShardGenerations shardGenerations) { this.genId = genId; this.snapshotIds = Collections.unmodifiableMap(snapshotIds); this.snapshotStates = Collections.unmodifiableMap(snapshotStates); this.indices = Collections.unmodifiableMap(indexSnapshots.keySet().stream() .collect(Collectors.toMap(IndexId::getName, Function.identity()))); this.indexSnapshots = Collections.unmodifiableMap(indexSnapshots); + this.shardGenerations = Objects.requireNonNull(shardGenerations); + assert indices.values().containsAll(shardGenerations.indices()) : "ShardGenerations contained indices " + + shardGenerations.indices() + " but snapshots only reference indices " + indices.values(); } protected RepositoryData copy() { - return new RepositoryData(genId, snapshotIds, snapshotStates, indexSnapshots); + return new RepositoryData(genId, snapshotIds, snapshotStates, indexSnapshots, shardGenerations); + } + + public ShardGenerations shardGenerations() { + return shardGenerations; } /** @@ -140,10 +152,15 @@ public List indicesToUpdateAfterRemovingSnapshot(SnapshotId snapshotId) /** * Add a snapshot and its indices to the repository; returns a new instance. If the snapshot * already exists in the repository data, this method throws an IllegalArgumentException. + * + * @param snapshotId Id of the new snapshot + * @param snapshotState State of the new snapshot + * @param shardGenerations Updated shard generations in the new snapshot. For each index contained in the snapshot an array of new + * generations indexed by the shard id they correspond to must be supplied. */ public RepositoryData addSnapshot(final SnapshotId snapshotId, final SnapshotState snapshotState, - final List snapshottedIndices) { + final ShardGenerations shardGenerations) { if (snapshotIds.containsKey(snapshotId.getUUID())) { // if the snapshot id already exists in the repository data, it means an old master // that is blocked from the cluster is trying to finalize a snapshot concurrently with @@ -155,10 +172,11 @@ public RepositoryData addSnapshot(final SnapshotId snapshotId, Map newSnapshotStates = new HashMap<>(snapshotStates); newSnapshotStates.put(snapshotId.getUUID(), snapshotState); Map> allIndexSnapshots = new HashMap<>(indexSnapshots); - for (final IndexId indexId : snapshottedIndices) { + for (final IndexId indexId : shardGenerations.indices()) { allIndexSnapshots.computeIfAbsent(indexId, k -> new LinkedHashSet<>()).add(snapshotId); } - return new RepositoryData(genId, snapshots, newSnapshotStates, allIndexSnapshots); + return new RepositoryData(genId, snapshots, newSnapshotStates, allIndexSnapshots, + ShardGenerations.builder().putAll(this.shardGenerations).putAll(shardGenerations).build()); } /** @@ -171,13 +189,18 @@ public RepositoryData withGenId(long newGeneration) { if (newGeneration == genId) { return this; } - return new RepositoryData(newGeneration, this.snapshotIds, this.snapshotStates, this.indexSnapshots); + return new RepositoryData(newGeneration, this.snapshotIds, this.snapshotStates, this.indexSnapshots, this.shardGenerations); } /** * Remove a snapshot and remove any indices that no longer exist in the repository due to the deletion of the snapshot. + * + * @param snapshotId Snapshot Id + * @param updatedShardGenerations Shard generations that changed as a result of removing the snapshot. + * The {@code String[]} passed for each {@link IndexId} contains the new shard generation id for each + * changed shard indexed by its shardId */ - public RepositoryData removeSnapshot(final SnapshotId snapshotId) { + public RepositoryData removeSnapshot(final SnapshotId snapshotId, final ShardGenerations updatedShardGenerations) { Map newSnapshotIds = snapshotIds.values().stream() .filter(id -> !snapshotId.equals(id)) .collect(Collectors.toMap(SnapshotId::getUUID, Function.identity())); @@ -205,7 +228,10 @@ public RepositoryData removeSnapshot(final SnapshotId snapshotId) { indexSnapshots.put(indexId, set); } - return new RepositoryData(genId, newSnapshotIds, newSnapshotStates, indexSnapshots); + return new RepositoryData(genId, newSnapshotIds, newSnapshotStates, indexSnapshots, + ShardGenerations.builder().putAll(shardGenerations).putAll(updatedShardGenerations) + .retainIndicesAndPruneDeletes(indexSnapshots.keySet()).build() + ); } /** @@ -231,12 +257,13 @@ public boolean equals(Object obj) { return snapshotIds.equals(that.snapshotIds) && snapshotStates.equals(that.snapshotStates) && indices.equals(that.indices) - && indexSnapshots.equals(that.indexSnapshots); + && indexSnapshots.equals(that.indexSnapshots) + && shardGenerations.equals(that.shardGenerations); } @Override public int hashCode() { - return Objects.hash(snapshotIds, snapshotStates, indices, indexSnapshots); + return Objects.hash(snapshotIds, snapshotStates, indices, indexSnapshots, shardGenerations); } /** @@ -276,6 +303,7 @@ public List resolveNewIndices(final List indicesToResolve) { return snapshotIndices; } + private static final String SHARD_GENERATIONS = "shard_generations"; private static final String SNAPSHOTS = "snapshots"; private static final String INDICES = "indices"; private static final String INDEX_ID = "id"; @@ -286,7 +314,10 @@ public List resolveNewIndices(final List indicesToResolve) { /** * Writes the snapshots metadata and the related indices metadata to x-content. */ - public XContentBuilder snapshotsToXContent(final XContentBuilder builder) throws IOException { + public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final boolean shouldWriteShardGens) throws IOException { + assert shouldWriteShardGens || shardGenerations.indices().isEmpty() : + "Should not build shard generations in BwC mode but saw generations [" + shardGenerations + "]"; + builder.startObject(); // write the snapshots list builder.startArray(SNAPSHOTS); @@ -312,6 +343,13 @@ public XContentBuilder snapshotsToXContent(final XContentBuilder builder) throws builder.value(snapshotId.getUUID()); } builder.endArray(); + if (shouldWriteShardGens) { + builder.startArray(SHARD_GENERATIONS); + for (String gen : shardGenerations.getGens(indexId)) { + builder.value(gen); + } + builder.endArray(); + } builder.endObject(); } builder.endObject(); @@ -326,6 +364,7 @@ public static RepositoryData snapshotsFromXContent(final XContentParser parser, final Map snapshots = new HashMap<>(); final Map snapshotStates = new HashMap<>(); final Map> indexSnapshots = new HashMap<>(); + final ShardGenerations.Builder shardGenerations = ShardGenerations.builder(); if (parser.nextToken() == XContentParser.Token.START_OBJECT) { while (parser.nextToken() == XContentParser.Token.FIELD_NAME) { @@ -363,6 +402,7 @@ public static RepositoryData snapshotsFromXContent(final XContentParser parser, while (parser.nextToken() != XContentParser.Token.END_OBJECT) { final String indexName = parser.currentName(); final Set snapshotIds = new LinkedHashSet<>(); + final List gens = new ArrayList<>(); IndexId indexId = null; if (parser.nextToken() != XContentParser.Token.START_OBJECT) { @@ -405,10 +445,20 @@ public static RepositoryData snapshotsFromXContent(final XContentParser parser, + " references an unknown snapshot uuid [" + uuid + "]"); } } + } else if (SHARD_GENERATIONS.equals(indexMetaFieldName)) { + XContentParserUtils.ensureExpectedToken( + XContentParser.Token.START_ARRAY, parser.currentToken(), parser::getTokenLocation); + while (parser.nextToken() != XContentParser.Token.END_ARRAY) { + gens.add(parser.textOrNull()); + } + } } assert indexId != null; indexSnapshots.put(indexId, snapshotIds); + for (int i = 0; i < gens.size(); i++) { + shardGenerations.put(indexId, i, gens.get(i)); + } } } else { throw new ElasticsearchParseException("unknown field name [" + field + "]"); @@ -417,7 +467,7 @@ public static RepositoryData snapshotsFromXContent(final XContentParser parser, } else { throw new ElasticsearchParseException("start object expected"); } - return new RepositoryData(genId, snapshots, snapshotStates, indexSnapshots); + return new RepositoryData(genId, snapshots, snapshotStates, indexSnapshots, shardGenerations.build()); } } diff --git a/server/src/main/java/org/elasticsearch/repositories/ShardGenerations.java b/server/src/main/java/org/elasticsearch/repositories/ShardGenerations.java new file mode 100644 index 0000000000000..6351d5e2f2bf0 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/repositories/ShardGenerations.java @@ -0,0 +1,215 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.repositories; + +import org.elasticsearch.common.Nullable; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +public final class ShardGenerations { + + public static final ShardGenerations EMPTY = new ShardGenerations(Collections.emptyMap()); + + /** + * Special generation that signifies that a shard is new and the repository does not yet contain a valid + * {@link org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots} blob for it. + */ + public static final String NEW_SHARD_GEN = "_new"; + + /** + * Special generation that signifies that the shard has been deleted from the repository. + * This generation is only used during computations. It should never be written to disk. + */ + public static final String DELETED_SHARD_GEN = "_deleted"; + + private final Map> shardGenerations; + + private ShardGenerations(Map> shardGenerations) { + this.shardGenerations = shardGenerations; + } + + /** + * Returns all indices for which shard generations are tracked. + * + * @return indices for which shard generations are tracked + */ + public Collection indices() { + return Collections.unmodifiableSet(shardGenerations.keySet()); + } + + /** + * Computes the obsolete shard index generations that can be deleted once this instance was written to the repository. + * Note: This method should only be used when finalizing a snapshot and we can safely assume that data has only been added but not + * removed from shard paths. + * + * @param previous Previous {@code ShardGenerations} + * @return Map of obsolete shard index generations in indices that are still tracked by this instance + */ + public Map> obsoleteShardGenerations(ShardGenerations previous) { + final Map> result = new HashMap<>(); + previous.shardGenerations.forEach(((indexId, oldGens) -> { + final List updatedGenerations = shardGenerations.get(indexId); + final Map obsoleteShardIndices = new HashMap<>(); + assert updatedGenerations != null + : "Index [" + indexId + "] present in previous shard generations, but missing from updated generations"; + for (int i = 0; i < Math.min(oldGens.size(), updatedGenerations.size()); i++) { + final String oldGeneration = oldGens.get(i); + final String updatedGeneration = updatedGenerations.get(i); + // If we had a previous generation that is different from an updated generation it's obsolete + // Since this method assumes only additions and no removals of shards, a null updated generation means no update + if (updatedGeneration != null && oldGeneration != null && oldGeneration.equals(updatedGeneration) == false) { + obsoleteShardIndices.put(i, oldGeneration); + } + } + result.put(indexId, Collections.unmodifiableMap(obsoleteShardIndices)); + })); + return Collections.unmodifiableMap(result); + } + + /** + * Get the generation of the {@link org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots} blob for a given index + * and shard. + * There are three special kinds of generations that can be returned here. + *
    + *
  • {@link #DELETED_SHARD_GEN} a deleted shard that isn't referenced by any snapshot in the repository any longer
  • + *
  • {@link #NEW_SHARD_GEN} a new shard that we know doesn't hold any valid data yet in the repository
  • + *
  • {@code null} unknown state. The shard either does not exist at all or it was created by a node older than + * {@link org.elasticsearch.snapshots.SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION}. If a caller expects a shard to exist in the + * repository but sees a {@code null} return, it should try to recover the generation by falling back to listing the contents + * of the respective shard directory.
  • + *
+ * + * @param indexId IndexId + * @param shardId Shard Id + * @return generation of the {@link org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots} blob + */ + @Nullable + public String getShardGen(IndexId indexId, int shardId) { + final List generations = shardGenerations.get(indexId); + if (generations == null || generations.size() < shardId + 1) { + return null; + } + return generations.get(shardId); + } + + public List getGens(IndexId indexId) { + final List existing = shardGenerations.get(indexId); + return existing == null ? Collections.emptyList() : Collections.unmodifiableList(existing); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final ShardGenerations that = (ShardGenerations) o; + return shardGenerations.equals(that.shardGenerations); + } + + @Override + public int hashCode() { + return Objects.hash(shardGenerations); + } + + @Override + public String toString() { + return "ShardGenerations{generations:" + this.shardGenerations + "}"; + } + + public static Builder builder() { + return new Builder(); + } + + public static final class Builder { + + private final Map> generations = new HashMap<>(); + + /** + * Filters out all generations that don't belong to any of the supplied {@code indices} and prunes all {@link #DELETED_SHARD_GEN} + * entries from the builder. + * + * @param indices indices to filter for + * @return builder that contains only the given {@code indices} and no {@link #DELETED_SHARD_GEN} entries + */ + public Builder retainIndicesAndPruneDeletes(Set indices) { + generations.keySet().retainAll(indices); + for (IndexId index : indices) { + final Map shards = generations.getOrDefault(index, Collections.emptyMap()); + final Iterator> iterator = shards.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + final String generation = entry.getValue(); + if (generation.equals(DELETED_SHARD_GEN)) { + iterator.remove(); + } + } + if (shards.isEmpty()) { + generations.remove(index); + } + } + return this; + } + + public Builder putAll(ShardGenerations shardGenerations) { + shardGenerations.shardGenerations.forEach((indexId, gens) -> { + for (int i = 0; i < gens.size(); i++) { + final String gen = gens.get(i); + if (gen != null) { + put(indexId, i, gens.get(i)); + } + } + }); + return this; + } + + public Builder put(IndexId indexId, int shardId, String generation) { + generations.computeIfAbsent(indexId, i -> new HashMap<>()).put(shardId, generation); + return this; + } + + public ShardGenerations build() { + return new ShardGenerations(generations.entrySet().stream().collect(Collectors.toMap( + Map.Entry::getKey, + entry -> { + final Set shardIds = entry.getValue().keySet(); + assert shardIds.isEmpty() == false; + final int size = shardIds.stream().mapToInt(i -> i).max().getAsInt() + 1; + // Create a list that can hold the highest shard id as index and leave null values for shards that don't have + // a map entry. + final String[] gens = new String[size]; + entry.getValue().forEach((shardId, generation) -> gens[shardId] = generation); + return Arrays.asList(gens); + } + ))); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 2ceca4da5f4e3..1c1f427c89960 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -91,11 +91,13 @@ import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.RepositoryVerificationException; +import org.elasticsearch.repositories.ShardGenerations; import org.elasticsearch.snapshots.SnapshotException; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotMissingException; import org.elasticsearch.snapshots.SnapshotShardFailure; +import org.elasticsearch.snapshots.SnapshotsService; import org.elasticsearch.threadpool.ThreadPool; import java.io.FilterInputStream; @@ -359,7 +361,7 @@ public RepositoryMetaData getMetadata() { } @Override - public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener listener) { + public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener listener) { if (isReadOnly()) { listener.onFailure(new RepositoryException(metadata.name(), "cannot delete snapshot from a readonly repository")); } else { @@ -369,7 +371,7 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Action // Cache the indices that were found before writing out the new index-N blob so that a stuck master will never // delete an index that was created by another master node after writing this index-N blob. final Map foundIndices = blobStore().blobContainer(indicesPath()).children(); - doDeleteShardSnapshots(snapshotId, repositoryStateId, foundIndices, rootBlobs, repositoryData, listener); + doDeleteShardSnapshots(snapshotId, repositoryStateId, foundIndices, rootBlobs, repositoryData, writeShardGens, listener); } catch (Exception ex) { listener.onFailure(new RepositoryException(metadata.name(), "failed to delete snapshot [" + snapshotId + "]", ex)); } @@ -390,47 +392,170 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Action * @param listener Listener to invoke once finished */ private void doDeleteShardSnapshots(SnapshotId snapshotId, long repositoryStateId, Map foundIndices, - Map rootBlobs, RepositoryData repositoryData, + Map rootBlobs, RepositoryData repositoryData, boolean writeShardGens, ActionListener listener) throws IOException { - final RepositoryData updatedRepositoryData = repositoryData.removeSnapshot(snapshotId); - writeIndexGen(updatedRepositoryData, repositoryStateId); - final ActionListener afterCleanupsListener = - new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2); - - // Run unreferenced blobs cleanup in parallel to snapshot deletion - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(afterCleanupsListener, - l -> cleanupStaleBlobs(foundIndices, rootBlobs, updatedRepositoryData, ActionListener.map(l, ignored -> null)))); - - deleteIndices( - updatedRepositoryData, - repositoryData.indicesToUpdateAfterRemovingSnapshot(snapshotId), - snapshotId, - ActionListener.runAfter( - ActionListener.wrap( - deleteResults -> { - // Now that all metadata (RepositoryData at the repo root as well as index-N blobs in all shard paths) - // has been updated we can execute the delete operations for all blobs that have become unreferenced as a result - final String basePath = basePath().buildAsString(); - final int basePathLen = basePath.length(); - blobContainer().deleteBlobsIgnoringIfNotExists( - Stream.concat( - deleteResults.stream().flatMap(shardResult -> { - final String shardPath = - shardContainer(shardResult.indexId, shardResult.shardId).path().buildAsString(); - return shardResult.blobsToDelete.stream().map(blob -> shardPath + blob); - }), - deleteResults.stream().map(shardResult -> shardResult.indexId).distinct().map(indexId -> - indexContainer(indexId).path().buildAsString() + globalMetaDataFormat.blobName(snapshotId.getUUID())) - ).map(absolutePath -> { - assert absolutePath.startsWith(basePath); - return absolutePath.substring(basePathLen); - }).collect(Collectors.toList())); - }, - // Any exceptions after we have updated the root level RepositoryData are only logged but won't fail the delete request - e -> logger.warn( - () -> new ParameterizedMessage("[{}] Failed to delete some blobs during snapshot delete", snapshotId), e)), - () -> afterCleanupsListener.onResponse(null)) - ); + + if (writeShardGens) { + // First write the new shard state metadata (with the removed snapshot) and compute deletion targets + final StepListener> writeShardMetaDataAndComputeDeletesStep = new StepListener<>(); + writeUpdatedShardMetaDataAndComputeDeletes(snapshotId, repositoryData, true, writeShardMetaDataAndComputeDeletesStep); + // Once we have put the new shard-level metadata into place, we can update the repository metadata as follows: + // 1. Remove the snapshot from the list of existing snapshots + // 2. Update the index shard generations of all updated shard folders + // + // Note: If we fail updating any of the individual shard paths, none of them are changed since the newly created + // index-${gen_uuid} will not be referenced by the existing RepositoryData and new RepositoryData is only + // written if all shard paths have been successfully updated. + final StepListener writeUpdatedRepoDataStep = new StepListener<>(); + writeShardMetaDataAndComputeDeletesStep.whenComplete(deleteResults -> { + final ShardGenerations.Builder builder = ShardGenerations.builder(); + for (ShardSnapshotMetaDeleteResult newGen : deleteResults) { + builder.put(newGen.indexId, newGen.shardId, newGen.newGeneration); + } + final RepositoryData updatedRepoData = repositoryData.removeSnapshot(snapshotId, builder.build()); + writeIndexGen(updatedRepoData, repositoryStateId, true); + writeUpdatedRepoDataStep.onResponse(updatedRepoData); + }, listener::onFailure); + // Once we have updated the repository, run the clean-ups + writeUpdatedRepoDataStep.whenComplete(updatedRepoData -> { + // Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion + final ActionListener afterCleanupsListener = + new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2); + asyncCleanupUnlinkedRootAndIndicesBlobs(foundIndices, rootBlobs, updatedRepoData, afterCleanupsListener); + asyncCleanupUnlinkedShardLevelBlobs(snapshotId, writeShardMetaDataAndComputeDeletesStep.result(), afterCleanupsListener); + }, listener::onFailure); + } else { + // Write the new repository data first (with the removed snapshot), using no shard generations + final RepositoryData updatedRepoData = repositoryData.removeSnapshot(snapshotId, ShardGenerations.EMPTY); + writeIndexGen(updatedRepoData, repositoryStateId, false); + // Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion + final ActionListener afterCleanupsListener = + new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2); + asyncCleanupUnlinkedRootAndIndicesBlobs(foundIndices, rootBlobs, updatedRepoData, afterCleanupsListener); + final StepListener> writeMetaAndComputeDeletesStep = new StepListener<>(); + writeUpdatedShardMetaDataAndComputeDeletes(snapshotId, repositoryData, false, writeMetaAndComputeDeletesStep); + writeMetaAndComputeDeletesStep.whenComplete(deleteResults -> + asyncCleanupUnlinkedShardLevelBlobs(snapshotId, deleteResults, afterCleanupsListener), afterCleanupsListener::onFailure); + } + } + + private void asyncCleanupUnlinkedRootAndIndicesBlobs(Map foundIndices, Map rootBlobs, + RepositoryData updatedRepoData, ActionListener listener) { + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap( + listener, + l -> cleanupStaleBlobs(foundIndices, rootBlobs, updatedRepoData, ActionListener.map(l, ignored -> null)))); + } + + private void asyncCleanupUnlinkedShardLevelBlobs(SnapshotId snapshotId, Collection deleteResults, + ActionListener listener) { + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap( + listener, + l -> { + try { + blobContainer().deleteBlobsIgnoringIfNotExists(resolveFilesToDelete(snapshotId, deleteResults)); + l.onResponse(null); + } catch (Exception e) { + logger.warn( + () -> new ParameterizedMessage("[{}] Failed to delete some blobs during snapshot delete", snapshotId), + e); + throw e; + } + })); + } + + // updates the shard state metadata for shards of a snapshot that is to be deleted. Also computes the files to be cleaned up. + private void writeUpdatedShardMetaDataAndComputeDeletes(SnapshotId snapshotId, RepositoryData oldRepositoryData, + boolean useUUIDs, ActionListener> onAllShardsCompleted) { + + final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); + final List indices = oldRepositoryData.indicesToUpdateAfterRemovingSnapshot(snapshotId); + + if (indices.isEmpty()) { + onAllShardsCompleted.onResponse(Collections.emptyList()); + return; + } + + // Listener that flattens out the delete results for each index + final ActionListener> deleteIndexMetaDataListener = new GroupedActionListener<>( + ActionListener.map(onAllShardsCompleted, + res -> res.stream().flatMap(Collection::stream).collect(Collectors.toList())), indices.size()); + + for (IndexId indexId : indices) { + final Set survivingSnapshots = oldRepositoryData.getSnapshots(indexId).stream() + .filter(id -> id.equals(snapshotId) == false).collect(Collectors.toSet()); + executor.execute(ActionRunnable.wrap(deleteIndexMetaDataListener, deleteIdxMetaListener -> { + final IndexMetaData indexMetaData; + try { + indexMetaData = getSnapshotIndexMetaData(snapshotId, indexId); + } catch (Exception ex) { + logger.warn(() -> + new ParameterizedMessage("[{}] [{}] failed to read metadata for index", snapshotId, indexId.getName()), ex); + // Just invoke the listener without any shard generations to count it down, this index will be cleaned up + // by the stale data cleanup in the end. + // TODO: Getting here means repository corruption. We should find a way of dealing with this instead of just ignoring + // it and letting the cleanup deal with it. + deleteIdxMetaListener.onResponse(null); + return; + } + final int shardCount = indexMetaData.getNumberOfShards(); + assert shardCount > 0 : "index did not have positive shard count, get [" + shardCount + "]"; + // Listener for collecting the results of removing the snapshot from each shard's metadata in the current index + final ActionListener allShardsListener = + new GroupedActionListener<>(deleteIdxMetaListener, shardCount); + final Index index = indexMetaData.getIndex(); + for (int shardId = 0; shardId < indexMetaData.getNumberOfShards(); shardId++) { + final ShardId shard = new ShardId(index, shardId); + executor.execute(new AbstractRunnable() { + @Override + protected void doRun() throws Exception { + final BlobContainer shardContainer = shardContainer(indexId, shard); + final Set blobs = getShardBlobs(shard, shardContainer); + final BlobStoreIndexShardSnapshots blobStoreIndexShardSnapshots; + final String newGen; + if (useUUIDs) { + newGen = UUIDs.randomBase64UUID(); + blobStoreIndexShardSnapshots = buildBlobStoreIndexShardSnapshots(blobs, shardContainer, + oldRepositoryData.shardGenerations().getShardGen(indexId, shard.getId())).v1(); + } else { + Tuple tuple = + buildBlobStoreIndexShardSnapshots(blobs, shardContainer); + newGen = Long.toString(tuple.v2() + 1); + blobStoreIndexShardSnapshots = tuple.v1(); + } + allShardsListener.onResponse(deleteFromShardSnapshotMeta(survivingSnapshots, indexId, shard, snapshotId, + shardContainer, blobs, blobStoreIndexShardSnapshots, newGen)); + } + + @Override + public void onFailure(Exception ex) { + logger.warn( + () -> new ParameterizedMessage("[{}] failed to delete shard data for shard [{}][{}]", + snapshotId, indexId.getName(), shard.id()), ex); + // Just passing null here to count down the listener instead of failing it, the stale data left behind + // here will be retried in the next delete or repository cleanup + allShardsListener.onResponse(null); + } + }); + } + })); + } + } + + private List resolveFilesToDelete(SnapshotId snapshotId, Collection deleteResults) { + final String basePath = basePath().buildAsString(); + final int basePathLen = basePath.length(); + return Stream.concat( + deleteResults.stream().flatMap(shardResult -> { + final String shardPath = + shardContainer(shardResult.indexId, shardResult.shardId).path().buildAsString(); + return shardResult.blobsToDelete.stream().map(blob -> shardPath + blob); + }), + deleteResults.stream().map(shardResult -> shardResult.indexId).distinct().map(indexId -> + indexContainer(indexId).path().buildAsString() + globalMetaDataFormat.blobName(snapshotId.getUUID())) + ).map(absolutePath -> { + assert absolutePath.startsWith(basePath); + return absolutePath.substring(basePathLen); + }).collect(Collectors.toList()); } /** @@ -472,9 +597,10 @@ private void cleanupStaleBlobs(Map foundIndices, MapDeleting unreferenced root level blobs {@link #cleanupStaleRootFiles} * * @param repositoryStateId Current repository state id + * @param writeShardGens If shard generations should be written to the repository * @param listener Listener to complete when done */ - public void cleanup(long repositoryStateId, ActionListener listener) { + public void cleanup(long repositoryStateId, boolean writeShardGens, ActionListener listener) { try { if (isReadOnly()) { throw new RepositoryException(metadata.name(), "cannot run cleanup on readonly repository"); @@ -496,7 +622,7 @@ public void cleanup(long repositoryStateId, ActionListener foundIndices return deleteResult; } - /** - * @param repositoryData RepositoryData with the snapshot removed - * @param indices Indices to remove the snapshot from (should not contain indices that become completely unreferenced with the - * removal of this snapshot as those are cleaned up afterwards by {@link #cleanupStaleBlobs}) - * @param snapshotId SnapshotId to remove from all the given indices - * @param listener Listener to invoke when finished - */ - private void deleteIndices(RepositoryData repositoryData, List indices, SnapshotId snapshotId, - ActionListener> listener) { - - if (indices.isEmpty()) { - listener.onResponse(Collections.emptyList()); - return; - } - - // Listener that flattens out the delete results for each index - final ActionListener> deleteIndexMetaDataListener = new GroupedActionListener<>( - ActionListener.map(listener, res -> res.stream().flatMap(Collection::stream).collect(Collectors.toList())), indices.size()); - final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); - for (IndexId indexId : indices) { - executor.execute(ActionRunnable.wrap(deleteIndexMetaDataListener, - deleteIdxMetaListener -> { - final IndexMetaData indexMetaData; - try { - indexMetaData = getSnapshotIndexMetaData(snapshotId, indexId); - } catch (Exception ex) { - logger.warn(() -> - new ParameterizedMessage("[{}] [{}] failed to read metadata for index", snapshotId, indexId.getName()), ex); - // Just invoke the listener without any shard generations to count it down, this index will be cleaned up - // by the stale data cleanup in the end. - deleteIdxMetaListener.onResponse(null); - return; - } - final int shardCount = indexMetaData.getNumberOfShards(); - assert shardCount > 0 : "index did not have positive shard count, get [" + shardCount + "]"; - // Listener for collecting the results of removing the snapshot from each shard's metadata in the current index - final ActionListener allShardsListener = - new GroupedActionListener<>(deleteIdxMetaListener, shardCount); - final Index index = indexMetaData.getIndex(); - for (int shardId = 0; shardId < indexMetaData.getNumberOfShards(); shardId++) { - final ShardId shard = new ShardId(index, shardId); - executor.execute(new AbstractRunnable() { - @Override - protected void doRun() throws Exception { - allShardsListener.onResponse( - deleteShardSnapshot(repositoryData, indexId, shard, snapshotId)); - } - - @Override - public void onFailure(Exception ex) { - logger.warn(() -> new ParameterizedMessage("[{}] failed to delete shard data for shard [{}][{}]", - snapshotId, indexId.getName(), shard.id()), ex); - // Just passing null here to count down the listener instead of failing it, the stale data left behind - // here will be retried in the next delete or repository cleanup - allShardsListener.onResponse(null); - } - }); - } - })); - } - } - @Override public void finalizeSnapshot(final SnapshotId snapshotId, - final List indices, + final ShardGenerations shardGenerations, final long startTime, final String failure, final int totalShards, @@ -653,15 +717,25 @@ public void finalizeSnapshot(final SnapshotId snapshotId, final boolean includeGlobalState, final MetaData clusterMetaData, final Map userMetadata, + boolean writeShardGens, final ActionListener listener) { - // We upload one meta blob for each index, one for the cluster-state and one snap-${uuid}.dat blob - // Once we're done writing all metadata, we update the index-N blob to finalize the snapshot + final Collection indices = shardGenerations.indices(); + // Once we are done writing the updated index-N blob we remove the now unreferenced index-${uuid} blobs in each shard + // directory if all nodes are at least at version SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION + // If there are older version nodes in the cluster, we don't need to run this cleanup as it will have already happened + // when writing the index-${N} to each shard directory. final ActionListener allMetaListener = new GroupedActionListener<>( ActionListener.wrap(snapshotInfos -> { assert snapshotInfos.size() == 1 : "Should have only received a single SnapshotInfo but received " + snapshotInfos; final SnapshotInfo snapshotInfo = snapshotInfos.iterator().next(); - writeIndexGen(getRepositoryData().addSnapshot(snapshotId, snapshotInfo.state(), indices), repositoryStateId); + final RepositoryData existingRepositoryData = getRepositoryData(); + final RepositoryData updatedRepositoryData = + existingRepositoryData.addSnapshot(snapshotId, snapshotInfo.state(), shardGenerations); + writeIndexGen(updatedRepositoryData, repositoryStateId, writeShardGens); + if (writeShardGens) { + cleanupOldShardGens(existingRepositoryData, updatedRepositoryData); + } listener.onResponse(snapshotInfo); }, e -> listener.onFailure(new SnapshotException(metadata.name(), snapshotId, "failed to update snapshot in repository", e))), @@ -694,6 +768,20 @@ public void finalizeSnapshot(final SnapshotId snapshotId, })); } + // Delete all old shard gen blobs that aren't referenced any longer as a result from moving to updated repository data + private void cleanupOldShardGens(RepositoryData existingRepositoryData, RepositoryData updatedRepositoryData) { + final List toDelete = new ArrayList<>(); + final int prefixPathLen = basePath().buildAsString().length(); + updatedRepositoryData.shardGenerations().obsoleteShardGenerations(existingRepositoryData.shardGenerations()).forEach( + (indexId, gens) -> gens.forEach((shardId, oldGen) -> toDelete.add( + shardContainer(indexId, shardId).path().buildAsString().substring(prefixPathLen) + INDEX_FILE_PREFIX + oldGen))); + try { + blobContainer().deleteBlobsIgnoringIfNotExists(toDelete); + } catch (Exception e) { + logger.warn("Failed to clean up old shard generation blobs", e); + } + } + @Override public SnapshotInfo getSnapshotInfo(final SnapshotId snapshotId) { try { @@ -854,7 +942,8 @@ public boolean isReadOnly() { return readOnly; } - protected void writeIndexGen(final RepositoryData repositoryData, final long expectedGen) throws IOException { + protected void writeIndexGen(final RepositoryData repositoryData, final long expectedGen, + final boolean writeShardGens) throws IOException { assert isReadOnly() == false; // can not write to a read only repository final long currentGen = repositoryData.getGenId(); if (currentGen != expectedGen) { @@ -868,7 +957,8 @@ protected void writeIndexGen(final RepositoryData repositoryData, final long exp // write the index file final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen); logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob); - writeAtomic(indexBlob, BytesReference.bytes(repositoryData.snapshotsToXContent(XContentFactory.jsonBuilder())), true); + writeAtomic(indexBlob, + BytesReference.bytes(repositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), writeShardGens)), true); // write the current generation to the index-latest file final BytesReference genBytes; try (BytesStreamOutput bStream = new BytesStreamOutput()) { @@ -956,7 +1046,8 @@ private void writeAtomic(final String blobName, final BytesReference bytesRef, b @Override public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, - IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener listener) { + IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens, + ActionListener listener) { final ShardId shardId = store.shardId(); final long startTime = threadPool.absoluteTimeInMillis(); final ActionListener snapshotDoneListener = ActionListener.wrap(listener::onResponse, e -> { @@ -964,19 +1055,23 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s listener.onFailure(e instanceof IndexShardSnapshotFailedException ? e : new IndexShardSnapshotFailedException(shardId, e)); }); try { - logger.debug("[{}] [{}] snapshot to [{}] ...", shardId, snapshotId, metadata.name()); - + final String generation = snapshotStatus.generation(); + logger.debug("[{}] [{}] snapshot to [{}] [{}] ...", shardId, snapshotId, metadata.name(), generation); final BlobContainer shardContainer = shardContainer(indexId, shardId); final Set blobs; - try { - blobs = shardContainer.listBlobsByPrefix(INDEX_FILE_PREFIX).keySet(); - } catch (IOException e) { - throw new IndexShardSnapshotFailedException(shardId, "failed to list blobs", e); + if (generation == null) { + try { + blobs = shardContainer.listBlobsByPrefix(INDEX_FILE_PREFIX).keySet(); + } catch (IOException e) { + throw new IndexShardSnapshotFailedException(shardId, "failed to list blobs", e); + } + } else { + blobs = Collections.singleton(INDEX_FILE_PREFIX + generation); } - Tuple tuple = buildBlobStoreIndexShardSnapshots(blobs, shardContainer); + Tuple tuple = buildBlobStoreIndexShardSnapshots(blobs, shardContainer, generation); BlobStoreIndexShardSnapshots snapshots = tuple.v1(); - long fileListGeneration = tuple.v2(); + String fileListGeneration = tuple.v2(); if (snapshots.snapshots().stream().anyMatch(sf -> sf.snapshot().equals(snapshotId.getName()))) { throw new IndexShardSnapshotFailedException(shardId, @@ -1074,27 +1169,34 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s for (SnapshotFiles point : snapshots) { newSnapshotsList.add(point); } - final String indexGeneration = Long.toString(fileListGeneration + 1); final List blobsToDelete; - try { - final BlobStoreIndexShardSnapshots updatedSnapshots = new BlobStoreIndexShardSnapshots(newSnapshotsList); - indexShardSnapshotsFormat.writeAtomic(updatedSnapshots, shardContainer, indexGeneration); + final String indexGeneration; + if (writeShardGens) { + indexGeneration = UUIDs.randomBase64UUID(); + blobsToDelete = Collections.emptyList(); + } else { + indexGeneration = Long.toString(Long.parseLong(fileListGeneration) + 1); // Delete all previous index-N blobs blobsToDelete = blobs.stream().filter(blob -> blob.startsWith(SNAPSHOT_INDEX_PREFIX)).collect(Collectors.toList()); assert blobsToDelete.stream().mapToLong(b -> Long.parseLong(b.replaceFirst(SNAPSHOT_INDEX_PREFIX, ""))) .max().orElse(-1L) < Long.parseLong(indexGeneration) : "Tried to delete an index-N blob newer than the current generation [" + indexGeneration + "] when deleting index-N blobs " + blobsToDelete; + } + try { + writeShardIndexBlob(shardContainer, indexGeneration, new BlobStoreIndexShardSnapshots(newSnapshotsList)); } catch (IOException e) { throw new IndexShardSnapshotFailedException(shardId, "Failed to finalize snapshot creation [" + snapshotId + "] with shard index [" + indexShardSnapshotsFormat.blobName(indexGeneration) + "]", e); } - try { - shardContainer.deleteBlobsIgnoringIfNotExists(blobsToDelete); - } catch (IOException e) { - logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete old index-N blobs during finalization", - snapshotId, shardId), e); + if (writeShardGens == false) { + try { + shardContainer.deleteBlobsIgnoringIfNotExists(blobsToDelete); + } catch (IOException e) { + logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete old index-N blobs during finalization", + snapshotId, shardId), e); + } } snapshotStatus.moveToDone(threadPool.absoluteTimeInMillis(), indexGeneration); snapshotDoneListener.onResponse(indexGeneration); @@ -1251,45 +1353,30 @@ public String toString() { } /** - * Delete shard snapshot + * Delete snapshot from shard level metadata. */ - private ShardSnapshotMetaDeleteResult deleteShardSnapshot(RepositoryData repositoryData, IndexId indexId, ShardId snapshotShardId, - SnapshotId snapshotId) throws IOException { - final BlobContainer shardContainer = shardContainer(indexId, snapshotShardId); - final Set blobs; - try { - blobs = shardContainer.listBlobs().keySet(); - } catch (IOException e) { - throw new IndexShardSnapshotException(snapshotShardId, "Failed to list content of shard directory", e); - } - - Tuple tuple = buildBlobStoreIndexShardSnapshots(blobs, shardContainer); - BlobStoreIndexShardSnapshots snapshots = tuple.v1(); - long fileListGeneration = tuple.v2(); - + private ShardSnapshotMetaDeleteResult deleteFromShardSnapshotMeta(Set survivingSnapshots, IndexId indexId, + ShardId snapshotShardId, SnapshotId snapshotId, + BlobContainer shardContainer, Set blobs, + BlobStoreIndexShardSnapshots snapshots, String indexGeneration) { // Build a list of snapshots that should be preserved List newSnapshotsList = new ArrayList<>(); - final Set survivingSnapshotNames = - repositoryData.getSnapshots(indexId).stream().map(SnapshotId::getName).collect(Collectors.toSet()); + final Set survivingSnapshotNames = survivingSnapshots.stream().map(SnapshotId::getName).collect(Collectors.toSet()); for (SnapshotFiles point : snapshots) { if (survivingSnapshotNames.contains(point.snapshot())) { newSnapshotsList.add(point); } } - final String indexGeneration = Long.toString(fileListGeneration + 1); try { - final List blobsToDelete; if (newSnapshotsList.isEmpty()) { - // If we deleted all snapshots, we don't need to create a new index file and simply delete all the blobs we found - blobsToDelete = List.copyOf(blobs); + return new ShardSnapshotMetaDeleteResult(indexId, snapshotShardId.id(), ShardGenerations.DELETED_SHARD_GEN, blobs); } else { - final Set survivingSnapshotUUIDs = repositoryData.getSnapshots(indexId).stream().map(SnapshotId::getUUID) - .collect(Collectors.toSet()); final BlobStoreIndexShardSnapshots updatedSnapshots = new BlobStoreIndexShardSnapshots(newSnapshotsList); - indexShardSnapshotsFormat.writeAtomic(updatedSnapshots, shardContainer, indexGeneration); - blobsToDelete = unusedBlobs(blobs, survivingSnapshotUUIDs, updatedSnapshots); + writeShardIndexBlob(shardContainer, indexGeneration, updatedSnapshots); + final Set survivingSnapshotUUIDs = survivingSnapshots.stream().map(SnapshotId::getUUID).collect(Collectors.toSet()); + return new ShardSnapshotMetaDeleteResult(indexId, snapshotShardId.id(), indexGeneration, + unusedBlobs(blobs, survivingSnapshotUUIDs, updatedSnapshots)); } - return new ShardSnapshotMetaDeleteResult(indexId, snapshotShardId.id(), blobsToDelete); } catch (IOException e) { throw new IndexShardSnapshotFailedException(snapshotShardId, "Failed to finalize snapshot deletion [" + snapshotId + "] with shard index [" @@ -1297,6 +1384,23 @@ private ShardSnapshotMetaDeleteResult deleteShardSnapshot(RepositoryData reposit } } + private void writeShardIndexBlob(BlobContainer shardContainer, String indexGeneration, + BlobStoreIndexShardSnapshots updatedSnapshots) throws IOException { + assert ShardGenerations.NEW_SHARD_GEN.equals(indexGeneration) == false; + assert ShardGenerations.DELETED_SHARD_GEN.equals(indexGeneration) == false; + indexShardSnapshotsFormat.writeAtomic(updatedSnapshots, shardContainer, indexGeneration); + } + + private static Set getShardBlobs(final ShardId snapshotShardId, final BlobContainer shardContainer) { + final Set blobs; + try { + blobs = shardContainer.listBlobs().keySet(); + } catch (IOException e) { + throw new IndexShardSnapshotException(snapshotShardId, "Failed to list content of shard directory", e); + } + return blobs; + } + // Unused blobs are all previous index-, data- and meta-blobs and that are not referenced by the new index- as well as all // temporary blobs private static List unusedBlobs(Set blobs, Set survivingSnapshotUUIDs, @@ -1310,7 +1414,6 @@ private static List unusedBlobs(Set blobs, Set surviving || FsBlobContainer.isTempBlobName(blob)).collect(Collectors.toList()); } - /** * Loads information about shard snapshot */ @@ -1325,6 +1428,29 @@ private BlobStoreIndexShardSnapshot loadShardSnapshot(BlobContainer shardContain } } + /** + * Loads all available snapshots in the repository using the given {@code generation} or falling back to trying to determine it from + * the given list of blobs in the shard container. + * + * @param blobs list of blobs in repository + * @param generation shard generation or {@code null} in case there was no shard generation tracked in the {@link RepositoryData} for + * this shard because its snapshot was created in a version older than + * {@link SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION}. + * @return tuple of BlobStoreIndexShardSnapshots and the last snapshot index generation + */ + private Tuple buildBlobStoreIndexShardSnapshots(Set blobs, + BlobContainer shardContainer, + @Nullable String generation) throws IOException { + if (generation != null) { + if (generation.equals(ShardGenerations.NEW_SHARD_GEN)) { + return new Tuple<>(BlobStoreIndexShardSnapshots.EMPTY, ShardGenerations.NEW_SHARD_GEN); + } + return new Tuple<>(indexShardSnapshotsFormat.read(shardContainer, generation), generation); + } + final Tuple legacyIndex = buildBlobStoreIndexShardSnapshots(blobs, shardContainer); + return new Tuple<>(legacyIndex.v1(), String.valueOf(legacyIndex.v2())); + } + /** * Loads all available snapshots in the repository * @@ -1413,12 +1539,16 @@ private static final class ShardSnapshotMetaDeleteResult { // Shard id that the snapshot was removed from private final int shardId; + // Id of the new index-${uuid} blob that does not include the snapshot any more + private final String newGeneration; + // Blob names in the shard directory that have become unreferenced in the new shard generation private final Collection blobsToDelete; - ShardSnapshotMetaDeleteResult(IndexId indexId, int shardId, Collection blobsToDelete) { + ShardSnapshotMetaDeleteResult(IndexId indexId, int shardId, String newGeneration, Collection blobsToDelete) { this.indexId = indexId; this.shardId = shardId; + this.newGeneration = newGeneration; this.blobsToDelete = blobsToDelete; } } diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/package-info.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/package-info.java index 8b2bedff134b3..d4f3329d354b4 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/package-info.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/package-info.java @@ -70,10 +70,12 @@ * | | |- snap-20131011.dat - SMILE serialized {@link org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot} for * | | | snapshot "20131011" * | | |- index-123 - SMILE serialized {@link org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots} for - * | | | the shard + * | | | the shard (files with numeric suffixes were created by older versions, newer ES versions use a uuid + * | | | suffix instead) * | | * | |- 1/ - data for shard "1" of index "foo" * | | |- __1 + * | | |- index-Zc2SS8ZgR8JvZAHlSMyMXy - SMILE serialized {@code BlobStoreIndexShardSnapshots} for the shard * | | ..... * | | * | |-2/ @@ -132,8 +134,9 @@ * *
    *
  1. Create the {@link org.apache.lucene.index.IndexCommit} for the shard to snapshot.
  2. - *
  3. List all blobs in the shard's path. Find the {@link org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots} blob - * with name {@code index-${N}} for the highest possible value of {@code N} in the list to get the information of what segment files are + *
  4. Get the {@link org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots} blob + * with name {@code index-${uuid}} with the {@code uuid} generation returned by + * {@link org.elasticsearch.repositories.ShardGenerations#getShardGen} to get the information of what segment files are * already available in the blobstore.
  5. *
  6. By comparing the files in the {@code IndexCommit} and the available file list from the previous step, determine the segment files * that need to be written to the blob store. For each segment that needs to be added to the blob store, generate a unique name by combining @@ -143,7 +146,7 @@ * the shard's path and contains a list of all the files referenced by the snapshot as well as some metadata about the snapshot. See the * documentation of {@code BlobStoreIndexShardSnapshot} for details on its contents.
  7. *
  8. Once all the segments and the {@code BlobStoreIndexShardSnapshot} blob have been written, an updated - * {@code BlobStoreIndexShardSnapshots} blob is written to the shard's path with name {@code index-${N+1}}.
  9. + * {@code BlobStoreIndexShardSnapshots} blob is written to the shard's path with name {@code index-${newUUID}}. *
* *

Finalizing the Snapshot

@@ -171,11 +174,6 @@ * *
    *
  1. Get the current {@code RepositoryData} from the latest {@code index-N} blob at the repository root.
  2. - *
  3. Write an updated {@code RepositoryData} blob with the deleted snapshot removed to key {@code /index-${N+1}} directly under the - * repository root.
  4. - *
  5. Write an updated {@code index.latest} blob containing {@code N + 1}.
  6. - *
  7. Delete the global {@code MetaData} blob {@code meta-${snapshot-uuid}.dat} stored directly under the repository root for the snapshot - * as well as the {@code SnapshotInfo} blob at {@code /snap-${snapshot-uuid}.dat}.
  8. *
  9. For each index referenced by the snapshot: *
      *
    1. Delete the snapshot's {@code IndexMetaData} at {@code /indices/${index-snapshot-uuid}/meta-${snapshot-uuid}}.
    2. @@ -184,16 +182,22 @@ *
    3. Remove the {@code BlobStoreIndexShardSnapshot} blob at {@code /indices/${index-snapshot-uuid}/${i}/snap-${snapshot-uuid}.dat}.
    4. *
    5. List all blobs in the shard path {@code /indices/${index-snapshot-uuid}} and build a new {@code BlobStoreIndexShardSnapshots} from * the remaining {@code BlobStoreIndexShardSnapshot} blobs in the shard. Afterwards, write it to the next shard generation blob at - * {@code /indices/${index-snapshot-uuid}/${i}/index-${N+1}} (The shard's generation is determined from the list of {@code index-N} blobs - * in the shard directory).
    6. - *
    7. Delete all segment blobs (identified by having the data blob prefix {@code __}) in the shard directory which are not referenced by - * the new {@code BlobStoreIndexShardSnapshots} that has been written in the previous step.
    8. + * {@code /indices/${index-snapshot-uuid}/${i}/index-${uuid}} (The shard's generation is determined from the map of shard generations in + * the {@link org.elasticsearch.repositories.RepositoryData} in the root {@code index-${N}} blob of the repository. + *
    9. Collect all segment blobs (identified by having the data blob prefix {@code __}) in the shard directory which are not referenced by + * the new {@code BlobStoreIndexShardSnapshots} that has been written in the previous step as well as the previous index-${uuid} + * blob so that it can be deleted at the end of the snapshot delete process.
    10. *
    *
  10. + *
  11. Write an updated {@code RepositoryData} blob with the deleted snapshot removed to key {@code /index-${N+1}} directly under the + * repository root and the repository generations that were changed in the affected shards adjusted.
  12. + *
  13. Write an updated {@code index.latest} blob containing {@code N + 1}.
  14. + *
  15. Delete the global {@code MetaData} blob {@code meta-${snapshot-uuid}.dat} stored directly under the repository root for the snapshot + * as well as the {@code SnapshotInfo} blob at {@code /snap-${snapshot-uuid}.dat}.
  16. + *
  17. Delete all unreferenced blobs previously collected when updating the shard directories. Also, remove any index folders or blobs + * under the repository root that are not referenced by the new {@code RepositoryData} written in the previous step.
  18. *
* * - * TODO: The above sequence of actions can lead to leaking files when an index completely goes out of scope. Adjust this documentation once - * https://github.com/elastic/elasticsearch/issues/13159 is fixed. */ package org.elasticsearch.repositories.blobstore; diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 9435df2177a50..9b256ecaa077c 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -281,7 +281,9 @@ private void startNewShards(SnapshotsInProgress.Entry entry, Map() { + assert entry.useShardGenerations() || snapshotStatus.generation() == null : + "Found non-null shard generation [" + snapshotStatus.generation() + "] for snapshot with old-format compatibility"; + snapshot(shardId, snapshot, indexId, snapshotStatus, entry.useShardGenerations(), new ActionListener<>() { @Override public void onResponse(String newGeneration) { assert newGeneration != null; @@ -311,7 +313,7 @@ public void onFailure(Exception e) { * @param snapshotStatus snapshot status */ private void snapshot(final ShardId shardId, final Snapshot snapshot, final IndexId indexId, - final IndexShardSnapshotStatus snapshotStatus, ActionListener listener) { + final IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens, ActionListener listener) { try { final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id()); if (indexShard.routingEntry().primary() == false) { @@ -334,7 +336,7 @@ private void snapshot(final ShardId shardId, final Snapshot snapshot, final Inde // we flush first to make sure we get the latest writes snapshotted snapshotRef = indexShard.acquireLastIndexCommit(true); repository.snapshotShard(indexShard.store(), indexShard.mapperService(), snapshot.getSnapshotId(), indexId, - snapshotRef.getIndexCommit(), snapshotStatus, ActionListener.runBefore(listener, snapshotRef::close)); + snapshotRef.getIndexCommit(), snapshotStatus, writeShardGens, ActionListener.runBefore(listener, snapshotRef::close)); } catch (Exception e) { IOUtils.close(snapshotRef); throw e; diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 46854b5983352..9e5306edffa1e 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -71,6 +71,7 @@ import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.RepositoryMissingException; +import org.elasticsearch.repositories.ShardGenerations; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; @@ -285,15 +286,16 @@ public ClusterState execute(ClusterState currentState) { request.indicesOptions(), request.indices())); logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices); List snapshotIndices = repositoryData.resolveNewIndices(indices); - newSnapshot = new SnapshotsInProgress.Entry(new Snapshot(repositoryName, snapshotId), - request.includeGlobalState(), - request.partial(), - State.INIT, - snapshotIndices, - threadPool.absoluteTimeInMillis(), - repositoryData.getGenId(), - null, - request.userMetadata()); + newSnapshot = new SnapshotsInProgress.Entry( + new Snapshot(repositoryName, snapshotId), + request.includeGlobalState(), request.partial(), + State.INIT, + snapshotIndices, + threadPool.absoluteTimeInMillis(), + repositoryData.getGenId(), + null, + request.userMetadata(), + clusterService.state().nodes().getMinNodeVersion().onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION)); initializingSnapshots.add(newSnapshot.snapshot()); snapshots = new SnapshotsInProgress(newSnapshot); } else { @@ -443,8 +445,7 @@ public ClusterState execute(ClusterState currentState) { hadAbortedInitializations = true; } else { // Replace the snapshot that was just initialized - ImmutableOpenMap shards = - shards(currentState, entry.indices()); + ImmutableOpenMap shards = shards(currentState, entry, repositoryData); if (!partial) { Tuple, Set> indicesWithMissingShards = indicesWithMissingShards(shards, currentState.metaData()); @@ -556,7 +557,7 @@ private void cleanupAfterError(Exception exception) { if (snapshotCreated) { repositoriesService.repository(snapshot.snapshot().getRepository()) .finalizeSnapshot(snapshot.snapshot().getSnapshotId(), - snapshot.indices(), + buildGenerations(snapshot), snapshot.startTime(), ExceptionsHelper.stackTrace(exception), 0, @@ -564,7 +565,9 @@ private void cleanupAfterError(Exception exception) { snapshot.getRepositoryStateId(), snapshot.includeGlobalState(), metaDataForSnapshot(snapshot, clusterService.state().metaData()), - snapshot.userMetadata(), ActionListener.runAfter(ActionListener.wrap(ignored -> { + snapshot.userMetadata(), + snapshot.useShardGenerations(), + ActionListener.runAfter(ActionListener.wrap(ignored -> { }, inner -> { inner.addSuppressed(exception); logger.warn(() -> new ParameterizedMessage("[{}] failed to finalize snapshot in repository", @@ -577,6 +580,14 @@ private void cleanupAfterError(Exception exception) { } } + private static ShardGenerations buildGenerations(SnapshotsInProgress.Entry snapshot) { + ShardGenerations.Builder builder = ShardGenerations.builder(); + final Map indexLookup = new HashMap<>(); + snapshot.indices().forEach(idx -> indexLookup.put(idx.getName(), idx)); + snapshot.shards().forEach(c -> builder.put(indexLookup.get(c.key.getIndexName()), c.key.id(), c.value.generation())); + return builder.build(); + } + private static MetaData metaDataForSnapshot(SnapshotsInProgress.Entry snapshot, MetaData metaData) { if (snapshot.includeGlobalState() == false) { // Remove global state from the cluster state @@ -760,7 +771,8 @@ private void finalizeSnapshotDeletionFromPreviousMaster(ClusterState state) { if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) { assert deletionsInProgress.getEntries().size() == 1 : "only one in-progress deletion allowed per cluster"; SnapshotDeletionsInProgress.Entry entry = deletionsInProgress.getEntries().get(0); - deleteSnapshotFromRepository(entry.getSnapshot(), null, entry.getRepositoryStateId()); + deleteSnapshotFromRepository(entry.getSnapshot(), null, entry.getRepositoryStateId(), + state.nodes().getMinNodeVersion()); } } @@ -1009,7 +1021,7 @@ protected void doRun() { } repository.finalizeSnapshot( snapshot.getSnapshotId(), - entry.indices(), + buildGenerations(entry), entry.startTime(), failure, entry.shards().size(), @@ -1017,7 +1029,9 @@ protected void doRun() { entry.getRepositoryStateId(), entry.includeGlobalState(), metaDataForSnapshot(entry, metaData), - entry.userMetadata(), ActionListener.wrap(snapshotInfo -> { + entry.userMetadata(), + entry.useShardGenerations(), + ActionListener.wrap(snapshotInfo -> { removeSnapshotFromClusterState(snapshot, snapshotInfo, null); logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state()); }, this::onFailure)); @@ -1299,7 +1313,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS )); } else { logger.debug("deleted snapshot is not running - deleting files"); - deleteSnapshotFromRepository(snapshot, listener, repositoryStateId); + deleteSnapshotFromRepository(snapshot, listener, repositoryStateId, newState.nodes().getMinNodeVersion()); } } }); @@ -1338,15 +1352,18 @@ public static boolean isRepositoryInUse(ClusterState clusterState, String reposi * @param snapshot snapshot * @param listener listener * @param repositoryStateId the unique id representing the state of the repository at the time the deletion began + * @param version minimum ES version the repository should be readable by */ - private void deleteSnapshotFromRepository(Snapshot snapshot, @Nullable ActionListener listener, long repositoryStateId) { + private void deleteSnapshotFromRepository(Snapshot snapshot, @Nullable ActionListener listener, long repositoryStateId, + Version version) { threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> { Repository repository = repositoriesService.repository(snapshot.getRepository()); - repository.deleteSnapshot(snapshot.getSnapshotId(), repositoryStateId, ActionListener.wrap(v -> { - logger.info("snapshot [{}] deleted", snapshot); - removeSnapshotDeletionFromClusterState(snapshot, null, l); - }, ex -> removeSnapshotDeletionFromClusterState(snapshot, ex, l) - )); + repository.deleteSnapshot(snapshot.getSnapshotId(), repositoryStateId, version.onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION), + ActionListener.wrap(v -> { + logger.info("snapshot [{}] deleted", snapshot); + removeSnapshotDeletionFromClusterState(snapshot, null, l); + }, ex -> removeSnapshotDeletionFromClusterState(snapshot, ex, l) + )); })); } @@ -1399,38 +1416,59 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS * Calculates the list of shards that should be included into the current snapshot * * @param clusterState cluster state - * @param indices list of indices to be snapshotted + * @param snapshot SnapshotsInProgress Entry * @return list of shard to be included into current snapshot */ private static ImmutableOpenMap shards(ClusterState clusterState, - List indices) { + SnapshotsInProgress.Entry snapshot, + RepositoryData repositoryData) { ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(); MetaData metaData = clusterState.metaData(); - for (IndexId index : indices) { + final ShardGenerations shardGenerations = repositoryData.shardGenerations(); + for (IndexId index : snapshot.indices()) { final String indexName = index.getName(); + final boolean isNewIndex = repositoryData.getIndices().containsKey(indexName) == false; IndexMetaData indexMetaData = metaData.index(indexName); if (indexMetaData == null) { // The index was deleted before we managed to start the snapshot - mark it as missing. - builder.put(new ShardId(indexName, IndexMetaData.INDEX_UUID_NA_VALUE, 0), missingStatus(null, "missing index")); + builder.put(new ShardId(indexName, IndexMetaData.INDEX_UUID_NA_VALUE, 0), + new SnapshotsInProgress.ShardSnapshotStatus(null, ShardState.MISSING, "missing index", null)); } else { IndexRoutingTable indexRoutingTable = clusterState.getRoutingTable().index(indexName); for (int i = 0; i < indexMetaData.getNumberOfShards(); i++) { ShardId shardId = new ShardId(indexMetaData.getIndex(), i); + final String shardRepoGeneration; + if (snapshot.useShardGenerations()) { + if (isNewIndex) { + assert shardGenerations.getShardGen(index, shardId.getId()) == null + : "Found shard generation for new index [" + index + "]"; + shardRepoGeneration = ShardGenerations.NEW_SHARD_GEN; + } else { + shardRepoGeneration = shardGenerations.getShardGen(index, shardId.getId()); + } + } else { + shardRepoGeneration = null; + } if (indexRoutingTable != null) { ShardRouting primary = indexRoutingTable.shard(i).primaryShard(); if (primary == null || !primary.assignedToNode()) { - builder.put(shardId, missingStatus(null, "primary shard is not allocated")); + builder.put(shardId, + new SnapshotsInProgress.ShardSnapshotStatus(null, ShardState.MISSING, "primary shard is not allocated", + shardRepoGeneration)); } else if (primary.relocating() || primary.initializing()) { builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus( - primary.currentNodeId(), ShardState.WAITING, null)); + primary.currentNodeId(), ShardState.WAITING, shardRepoGeneration)); } else if (!primary.started()) { - builder.put(shardId, missingStatus(primary.currentNodeId(), "primary shard hasn't been started yet")); + builder.put(shardId, + new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId(), ShardState.MISSING, + "primary shard hasn't been started yet", shardRepoGeneration)); } else { - builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus( - primary.currentNodeId(), null)); + builder.put(shardId, + new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId(), shardRepoGeneration)); } } else { - builder.put(shardId, missingStatus(null, "missing routing table")); + builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(null, ShardState.MISSING, + "missing routing table", shardRepoGeneration)); } } } @@ -1439,10 +1477,6 @@ private static ImmutableOpenMap> waitingIndices = entry.waitingIndices(); assertEquals(2, waitingIndices.get(idx1Name).size()); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexServiceTests.java index 70d1ef34b092b..b77653c34c769 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexServiceTests.java @@ -62,7 +62,7 @@ public void testDeleteSnapshotting() { SnapshotsInProgress snaps = new SnapshotsInProgress(new SnapshotsInProgress.Entry(snapshot, true, false, SnapshotsInProgress.State.INIT, singletonList(new IndexId(index, "doesn't matter")), System.currentTimeMillis(), (long) randomIntBetween(0, 1000), ImmutableOpenMap.of(), - SnapshotInfoTests.randomUserMetadata())); + SnapshotInfoTests.randomUserMetadata(), randomBoolean())); ClusterState state = ClusterState.builder(clusterState(index)) .putCustom(SnapshotsInProgress.TYPE, snaps) .build(); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java index 1a385fb956f27..f5aad6f05cd3b 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java @@ -472,7 +472,7 @@ private static ClusterState addSnapshotIndex(final String index, final int numSh final SnapshotsInProgress.Entry entry = new SnapshotsInProgress.Entry(snapshot, randomBoolean(), false, SnapshotsInProgress.State.INIT, Collections.singletonList(new IndexId(index, index)), randomNonNegativeLong(), randomLong(), shardsBuilder.build(), - SnapshotInfoTests.randomUserMetadata()); + SnapshotInfoTests.randomUserMetadata(), randomBoolean()); return ClusterState.builder(newState).putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(entry)).build(); } diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java index ff6e7194a653b..63e5aa820d137 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java @@ -154,15 +154,15 @@ public RepositoryData getRepositoryData() { } @Override - public void finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, + public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations indices, long startTime, String failure, int totalShards, List shardFailures, long repositoryStateId, boolean includeGlobalState, MetaData metaData, Map userMetadata, - ActionListener listener) { + boolean writeShardGens, ActionListener listener) { listener.onResponse(null); } @Override - public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener listener) { + public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener listener) { listener.onResponse(null); } @@ -198,7 +198,8 @@ public boolean isReadOnly() { @Override public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit - snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener listener) { + snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens, + ActionListener listener) { } diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoryDataTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoryDataTests.java index 1958e37778bc7..1067013d20e97 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoryDataTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoryDataTests.java @@ -72,7 +72,7 @@ public void testIndicesToUpdateAfterRemovingSnapshot() { public void testXContent() throws IOException { RepositoryData repositoryData = generateRandomRepoData(); XContentBuilder builder = JsonXContent.contentBuilder(); - repositoryData.snapshotsToXContent(builder); + repositoryData.snapshotsToXContent(builder, true); try (XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder))) { long gen = (long) randomIntBetween(0, 500); RepositoryData fromXContent = RepositoryData.snapshotsFromXContent(parser, gen); @@ -90,18 +90,22 @@ public void testAddSnapshots() { List indices = new ArrayList<>(); Set newIndices = new HashSet<>(); int numNew = randomIntBetween(1, 10); + final ShardGenerations.Builder builder = ShardGenerations.builder(); for (int i = 0; i < numNew; i++) { IndexId indexId = new IndexId(randomAlphaOfLength(7), UUIDs.randomBase64UUID()); newIndices.add(indexId); indices.add(indexId); + builder.put(indexId, 0, "1"); } int numOld = randomIntBetween(1, indexIdMap.size()); List indexNames = new ArrayList<>(indexIdMap.keySet()); for (int i = 0; i < numOld; i++) { - indices.add(indexIdMap.get(indexNames.get(i))); + final IndexId indexId = indexIdMap.get(indexNames.get(i)); + indices.add(indexId); + builder.put(indexId, 0, "2"); } RepositoryData newRepoData = repositoryData.addSnapshot(newSnapshot, - randomFrom(SnapshotState.SUCCESS, SnapshotState.PARTIAL, SnapshotState.FAILED), indices); + randomFrom(SnapshotState.SUCCESS, SnapshotState.PARTIAL, SnapshotState.FAILED), builder.build()); // verify that the new repository data has the new snapshot and its indices assertTrue(newRepoData.getSnapshotIds().contains(newSnapshot)); for (IndexId indexId : indices) { @@ -124,10 +128,11 @@ public void testInitIndices() { snapshotStates.put(snapshotId.getUUID(), randomFrom(SnapshotState.values())); } RepositoryData repositoryData = new RepositoryData(EMPTY_REPO_GEN, snapshotIds, - Collections.emptyMap(), Collections.emptyMap()); + Collections.emptyMap(), Collections.emptyMap(), ShardGenerations.EMPTY); // test that initializing indices works Map> indices = randomIndices(snapshotIds); - RepositoryData newRepoData = new RepositoryData(repositoryData.getGenId(), snapshotIds, snapshotStates, indices); + RepositoryData newRepoData = + new RepositoryData(repositoryData.getGenId(), snapshotIds, snapshotStates, indices, ShardGenerations.EMPTY); List expected = new ArrayList<>(repositoryData.getSnapshotIds()); Collections.sort(expected); List actual = new ArrayList<>(newRepoData.getSnapshotIds()); @@ -143,7 +148,7 @@ public void testRemoveSnapshot() { List snapshotIds = new ArrayList<>(repositoryData.getSnapshotIds()); assertThat(snapshotIds.size(), greaterThan(0)); SnapshotId removedSnapshotId = snapshotIds.remove(randomIntBetween(0, snapshotIds.size() - 1)); - RepositoryData newRepositoryData = repositoryData.removeSnapshot(removedSnapshotId); + RepositoryData newRepositoryData = repositoryData.removeSnapshot(removedSnapshotId, ShardGenerations.EMPTY); // make sure the repository data's indices no longer contain the removed snapshot for (final IndexId indexId : newRepositoryData.getIndices().values()) { assertFalse(newRepositoryData.getSnapshots(indexId).contains(removedSnapshotId)); @@ -163,7 +168,7 @@ public void testResolveIndexId() { public void testGetSnapshotState() { final SnapshotId snapshotId = new SnapshotId(randomAlphaOfLength(8), UUIDs.randomBase64UUID()); final SnapshotState state = randomFrom(SnapshotState.values()); - final RepositoryData repositoryData = RepositoryData.EMPTY.addSnapshot(snapshotId, state, Collections.emptyList()); + final RepositoryData repositoryData = RepositoryData.EMPTY.addSnapshot(snapshotId, state, ShardGenerations.EMPTY); assertEquals(state, repositoryData.getSnapshotState(snapshotId)); assertNull(repositoryData.getSnapshotState(new SnapshotId(randomAlphaOfLength(8), UUIDs.randomBase64UUID()))); } @@ -173,7 +178,7 @@ public void testIndexThatReferencesAnUnknownSnapshot() throws IOException { final RepositoryData repositoryData = generateRandomRepoData(); XContentBuilder builder = XContentBuilder.builder(xContent); - repositoryData.snapshotsToXContent(builder); + repositoryData.snapshotsToXContent(builder, true); RepositoryData parsedRepositoryData; try (XContentParser xParser = createParser(builder)) { parsedRepositoryData = RepositoryData.snapshotsFromXContent(xParser, repositoryData.getGenId()); @@ -190,6 +195,7 @@ public void testIndexThatReferencesAnUnknownSnapshot() throws IOException { final IndexId corruptedIndexId = randomFrom(parsedRepositoryData.getIndices().values()); Map> indexSnapshots = new HashMap<>(); + final ShardGenerations.Builder shardGenBuilder = ShardGenerations.builder(); for (Map.Entry snapshottedIndex : parsedRepositoryData.getIndices().entrySet()) { IndexId indexId = snapshottedIndex.getValue(); Set snapshotsIds = new LinkedHashSet<>(parsedRepositoryData.getSnapshots(indexId)); @@ -197,14 +203,18 @@ public void testIndexThatReferencesAnUnknownSnapshot() throws IOException { snapshotsIds.add(new SnapshotId("_uuid", "_does_not_exist")); } indexSnapshots.put(indexId, snapshotsIds); + final int shardCount = randomIntBetween(1, 10); + for (int i = 0; i < shardCount; ++i) { + shardGenBuilder.put(indexId, i, UUIDs.randomBase64UUID(random())); + } } assertNotNull(corruptedIndexId); RepositoryData corruptedRepositoryData = new RepositoryData(parsedRepositoryData.getGenId(), snapshotIds, snapshotStates, - indexSnapshots); + indexSnapshots, shardGenBuilder.build()); final XContentBuilder corruptedBuilder = XContentBuilder.builder(xContent); - corruptedRepositoryData.snapshotsToXContent(corruptedBuilder); + corruptedRepositoryData.snapshotsToXContent(corruptedBuilder, true); try (XContentParser xParser = createParser(corruptedBuilder)) { ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> @@ -262,7 +272,14 @@ public static RepositoryData generateRandomRepoData() { for (int i = 0; i < numSnapshots; i++) { final SnapshotId snapshotId = new SnapshotId(randomAlphaOfLength(8), UUIDs.randomBase64UUID()); final List someIndices = indices.subList(0, randomIntBetween(1, numIndices)); - repositoryData = repositoryData.addSnapshot(snapshotId, randomFrom(SnapshotState.values()), someIndices); + final ShardGenerations.Builder builder = ShardGenerations.builder(); + for (IndexId someIndex : someIndices) { + final int shardCount = randomIntBetween(1, 10); + for (int j = 0; j < shardCount; ++j) { + builder.put(someIndex, 0, UUIDs.randomBase64UUID(random())); + } + } + repositoryData = repositoryData.addSnapshot(snapshotId, randomFrom(SnapshotState.values()), builder.build()); } return repositoryData; } diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java index b2f21e89a7d61..59a977efe5f5d 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java @@ -21,6 +21,8 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.util.TestUtil; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; @@ -41,14 +43,17 @@ import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.Repository; +import org.elasticsearch.repositories.ShardGenerations; import org.elasticsearch.repositories.fs.FsRepository; import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.snapshots.SnapshotInfo; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.Arrays; +import java.util.Collections; import java.util.List; import static org.hamcrest.Matchers.containsString; @@ -163,6 +168,13 @@ public void testSnapshotWithConflictingName() throws IOException { assertNotNull(shardGen); final Snapshot snapshotWithSameName = new Snapshot(repository.getMetadata().name(), new SnapshotId( snapshot.getSnapshotId().getName(), "_uuid2")); + final PlainActionFuture future = PlainActionFuture.newFuture(); + repository.finalizeSnapshot(snapshot.getSnapshotId(), + ShardGenerations.builder().put(indexId, 0, shardGen).build(), + 0L, null, 1, Collections.emptyList(), -1L, false, + MetaData.builder().put(shard.indexSettings().getIndexMetaData(), false).build(), Collections.emptyMap(), true, + future); + future.actionGet(); IndexShardSnapshotFailedException isfe = expectThrows(IndexShardSnapshotFailedException.class, () -> snapshotShard(shard, snapshotWithSameName, repository)); assertThat(isfe.getMessage(), containsString("Duplicate snapshot name")); diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java index 13d7536730ed6..32b65880c8249 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java @@ -34,6 +34,7 @@ import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.RepositoryException; +import org.elasticsearch.repositories.ShardGenerations; import org.elasticsearch.repositories.fs.FsRepository; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotState; @@ -43,7 +44,6 @@ import java.io.IOException; import java.nio.file.Path; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -141,7 +141,7 @@ public void testReadAndWriteSnapshotsThroughIndexFile() throws Exception { // write to and read from a index file with no entries assertThat(repository.getRepositoryData().getSnapshotIds().size(), equalTo(0)); final RepositoryData emptyData = RepositoryData.EMPTY; - repository.writeIndexGen(emptyData, emptyData.getGenId()); + repository.writeIndexGen(emptyData, emptyData.getGenId(), true); RepositoryData repoData = repository.getRepositoryData(); assertEquals(repoData, emptyData); assertEquals(repoData.getIndices().size(), 0); @@ -150,12 +150,12 @@ public void testReadAndWriteSnapshotsThroughIndexFile() throws Exception { // write to and read from an index file with snapshots but no indices repoData = addRandomSnapshotsToRepoData(repoData, false); - repository.writeIndexGen(repoData, repoData.getGenId()); + repository.writeIndexGen(repoData, repoData.getGenId(), true); assertEquals(repoData, repository.getRepositoryData()); // write to and read from a index file with random repository data repoData = addRandomSnapshotsToRepoData(repository.getRepositoryData(), true); - repository.writeIndexGen(repoData, repoData.getGenId()); + repository.writeIndexGen(repoData, repoData.getGenId(), true); assertEquals(repoData, repository.getRepositoryData()); } @@ -164,21 +164,22 @@ public void testIndexGenerationalFiles() throws Exception { // write to index generational file RepositoryData repositoryData = generateRandomRepoData(); - repository.writeIndexGen(repositoryData, repositoryData.getGenId()); + repository.writeIndexGen(repositoryData, repositoryData.getGenId(), true); assertThat(repository.getRepositoryData(), equalTo(repositoryData)); assertThat(repository.latestIndexBlobId(), equalTo(0L)); assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(0L)); // adding more and writing to a new index generational file repositoryData = addRandomSnapshotsToRepoData(repository.getRepositoryData(), true); - repository.writeIndexGen(repositoryData, repositoryData.getGenId()); + repository.writeIndexGen(repositoryData, repositoryData.getGenId(), true); assertEquals(repository.getRepositoryData(), repositoryData); assertThat(repository.latestIndexBlobId(), equalTo(1L)); assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(1L)); // removing a snapshot and writing to a new index generational file - repositoryData = repository.getRepositoryData().removeSnapshot(repositoryData.getSnapshotIds().iterator().next()); - repository.writeIndexGen(repositoryData, repositoryData.getGenId()); + repositoryData = repository.getRepositoryData().removeSnapshot( + repositoryData.getSnapshotIds().iterator().next(), ShardGenerations.EMPTY); + repository.writeIndexGen(repositoryData, repositoryData.getGenId(), true); assertEquals(repository.getRepositoryData(), repositoryData); assertThat(repository.latestIndexBlobId(), equalTo(2L)); assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(2L)); @@ -190,12 +191,12 @@ public void testRepositoryDataConcurrentModificationNotAllowed() throws IOExcept // write to index generational file RepositoryData repositoryData = generateRandomRepoData(); final long startingGeneration = repositoryData.getGenId(); - repository.writeIndexGen(repositoryData, startingGeneration); + repository.writeIndexGen(repositoryData, startingGeneration, true); // write repo data again to index generational file, errors because we already wrote to the // N+1 generation from which this repository data instance was created expectThrows(RepositoryException.class, () -> repository.writeIndexGen( - repositoryData.withGenId(startingGeneration + 1), repositoryData.getGenId())); + repositoryData.withGenId(startingGeneration + 1), repositoryData.getGenId(), true)); } public void testBadChunksize() throws Exception { @@ -242,12 +243,12 @@ private RepositoryData addRandomSnapshotsToRepoData(RepositoryData repoData, boo for (int i = 0; i < numSnapshots; i++) { SnapshotId snapshotId = new SnapshotId(randomAlphaOfLength(8), UUIDs.randomBase64UUID()); int numIndices = inclIndices ? randomIntBetween(0, 20) : 0; - List indexIds = new ArrayList<>(numIndices); + final ShardGenerations.Builder builder = ShardGenerations.builder(); for (int j = 0; j < numIndices; j++) { - indexIds.add(new IndexId(randomAlphaOfLength(8), UUIDs.randomBase64UUID())); + builder.put(new IndexId(randomAlphaOfLength(8), UUIDs.randomBase64UUID()), 0, "1"); } repoData = repoData.addSnapshot(snapshotId, - randomFrom(SnapshotState.SUCCESS, SnapshotState.PARTIAL, SnapshotState.FAILED), indexIds); + randomFrom(SnapshotState.SUCCESS, SnapshotState.PARTIAL, SnapshotState.FAILED), builder.build()); } return repoData; } diff --git a/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java index 45ad368535380..5694629325557 100644 --- a/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java @@ -103,8 +103,7 @@ public void testSnapshotAndRestore() throws IOException, InterruptedException { final PlainActionFuture future1 = PlainActionFuture.newFuture(); runGeneric(threadPool, () -> { IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(null); - repository.snapshotShard(store, null, snapshotId, indexId, indexCommit, - snapshotStatus, future1); + repository.snapshotShard(store, null, snapshotId, indexId, indexCommit, snapshotStatus, true, future1); future1.actionGet(); IndexShardSnapshotStatus.Copy copy = snapshotStatus.asCopy(); assertEquals(copy.getTotalFileCount(), copy.getIncrementalFileCount()); @@ -130,7 +129,7 @@ public void testSnapshotAndRestore() throws IOException, InterruptedException { final PlainActionFuture future2 = PlainActionFuture.newFuture(); runGeneric(threadPool, () -> { IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(shardGeneration); - repository.snapshotShard(store, null, incSnapshotId, indexId, incIndexCommit, snapshotStatus, future2); + repository.snapshotShard(store, null, incSnapshotId, indexId, incIndexCommit, snapshotStatus, true, future2); future2.actionGet(); IndexShardSnapshotStatus.Copy copy = snapshotStatus.asCopy(); assertEquals(2, copy.getIncrementalFileCount()); diff --git a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index 8d676f7fa0a86..2cef48e1969f3 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -85,6 +85,7 @@ import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.RepositoryException; +import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.script.MockScriptEngine; import org.elasticsearch.script.StoredScriptsIT; @@ -1459,7 +1460,8 @@ public void testDeleteSnapshotWithMissingIndexAndShardMetadata() throws Exceptio for (String index : indices) { Path shardZero = indicesPath.resolve(indexIds.get(index).getId()).resolve("0"); if (randomBoolean()) { - Files.delete(shardZero.resolve("index-0")); + Files.delete( + shardZero.resolve("index-" + getRepositoryData(repository).shardGenerations().getShardGen(indexIds.get(index), 0))); } Files.delete(shardZero.resolve("snap-" + snapshotInfo.snapshotId().getUUID() + ".dat")); } @@ -1620,6 +1622,8 @@ public void testDeleteSnapshotWithCorruptedGlobalState() throws Exception { } public void testSnapshotWithMissingShardLevelIndexFile() throws Exception { + disableRepoConsistencyCheck("This test uses a purposely broken repository so it would fail consistency checks"); + Path repo = randomRepoPath(); logger.info("--> creating repository at {}", repo.toAbsolutePath()); assertAcked(client().admin().cluster().preparePutRepository("test-repo").setType("fs").setSettings( @@ -1639,7 +1643,13 @@ public void testSnapshotWithMissingShardLevelIndexFile() throws Exception { try (Stream files = Files.list(repo.resolve("indices"))) { files.forEach(indexPath -> { try { - Files.delete(indexPath.resolve("0").resolve("index-0")); + final Path shardGen; + try (Stream shardFiles = Files.list(indexPath.resolve("0"))) { + shardGen = shardFiles + .filter(file -> file.getFileName().toString().startsWith(BlobStoreRepository.INDEX_FILE_PREFIX)) + .findFirst().orElseThrow(() -> new AssertionError("Failed to find shard index blob")); + } + Files.delete(shardGen); } catch (IOException e) { throw new RuntimeException("Failed to delete expected file", e); } @@ -1650,11 +1660,11 @@ public void testSnapshotWithMissingShardLevelIndexFile() throws Exception { CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-2") .setWaitForCompletion(true).setIndices("test-idx-1").get(); - assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); - assertEquals(createSnapshotResponse.getSnapshotInfo().successfulShards(), createSnapshotResponse.getSnapshotInfo().totalShards()); + assertEquals(createSnapshotResponse.getSnapshotInfo().successfulShards(), + createSnapshotResponse.getSnapshotInfo().totalShards() - 1); logger.info("--> restoring the first snapshot, the repository should not have lost any shard data despite deleting index-N, " + - "because it should have iterated over the snap-*.data files as backup"); + "because it uses snap-*.data files and not the index-N to determine what files to restore"); client().admin().indices().prepareDelete("test-idx-1", "test-idx-2").get(); RestoreSnapshotResponse restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap-1").setWaitForCompletion(true).get(); @@ -3000,7 +3010,7 @@ public void testSnapshotWithCorruptedShardIndexFile() throws Exception { final IndexId corruptedIndex = indexIds.get(indexName); final Path shardIndexFile = repo.resolve("indices") .resolve(corruptedIndex.getId()).resolve("0") - .resolve("index-0"); + .resolve("index-" + repositoryData.shardGenerations().getShardGen(corruptedIndex, 0)); logger.info("--> truncating shard index file [{}]", shardIndexFile); try (SeekableByteChannel outChan = Files.newByteChannel(shardIndexFile, StandardOpenOption.WRITE)) { @@ -3761,6 +3771,86 @@ public void testRestoreIncreasesPrimaryTerms() { } } + public void testSnapshotDifferentIndicesBySameName() { + String indexName = "testindex"; + String repoName = "test-repo"; + String absolutePath = randomRepoPath().toAbsolutePath().toString(); + logger.info("Path [{}]", absolutePath); + + final int initialShardCount = randomIntBetween(1, 10); + createIndex(indexName, Settings.builder().put("index.number_of_shards", initialShardCount).build()); + ensureGreen(); + + logger.info("--> indexing some documents"); + final int docCount = initialShardCount * randomIntBetween(1, 10); + for (int i = 0; i < docCount; i++) { + index(indexName, "_doc", Integer.toString(i), "foo", "bar" + i); + } + + logger.info("--> creating repository"); + assertAcked(client().admin().cluster().preparePutRepository(repoName) + .setType("fs") + .setSettings(Settings.builder().put("location", absolutePath))); + + logger.info("--> snapshot with [{}] shards", initialShardCount); + final SnapshotInfo snapshot1 = + client().admin().cluster().prepareCreateSnapshot(repoName, "snap-1").setWaitForCompletion(true).get().getSnapshotInfo(); + assertThat(snapshot1.state(), is(SnapshotState.SUCCESS)); + assertThat(snapshot1.successfulShards(), is(initialShardCount)); + + logger.info("--> delete index"); + assertAcked(client().admin().indices().prepareDelete(indexName)); + + final int newShardCount = randomIntBetween(1, 10); + createIndex(indexName, Settings.builder().put("index.number_of_shards", newShardCount).build()); + ensureGreen(); + + logger.info("--> indexing some documents"); + final int newDocCount = newShardCount * randomIntBetween(1, 10); + for (int i = 0; i < newDocCount; i++) { + index(indexName, "_doc", Integer.toString(i), "foo", "bar" + i); + } + + logger.info("--> snapshot with [{}] shards", newShardCount); + final SnapshotInfo snapshot2 = + client().admin().cluster().prepareCreateSnapshot(repoName, "snap-2").setWaitForCompletion(true).get().getSnapshotInfo(); + assertThat(snapshot2.state(), is(SnapshotState.SUCCESS)); + assertThat(snapshot2.successfulShards(), is(newShardCount)); + + logger.info("--> restoring snapshot 1"); + client().admin().cluster().prepareRestoreSnapshot(repoName, "snap-1").setIndices(indexName).setRenamePattern(indexName) + .setRenameReplacement("restored-1").setWaitForCompletion(true).get(); + + logger.info("--> restoring snapshot 2"); + client().admin().cluster().prepareRestoreSnapshot(repoName, "snap-2").setIndices(indexName).setRenamePattern(indexName) + .setRenameReplacement("restored-2").setWaitForCompletion(true).get(); + + logger.info("--> verify doc counts"); + assertHitCount(client().prepareSearch("restored-1").setSize(0).get(), docCount); + assertHitCount(client().prepareSearch("restored-2").setSize(0).get(), newDocCount); + + final String snapshotToDelete; + final String snapshotToRestore; + final int expectedCount; + if (randomBoolean()) { + snapshotToDelete = "snap-1"; + snapshotToRestore = "snap-2"; + expectedCount = newDocCount; + } else { + snapshotToDelete = "snap-2"; + snapshotToRestore = "snap-1"; + expectedCount = docCount; + } + logger.info("--> deleting snapshot [{}]", snapshotToDelete); + assertAcked(client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotToDelete).get()); + logger.info("--> restoring snapshot [{}]", snapshotToRestore); + client().admin().cluster().prepareRestoreSnapshot(repoName, snapshotToRestore).setIndices(indexName).setRenamePattern(indexName) + .setRenameReplacement("restored-3").setWaitForCompletion(true).get(); + + logger.info("--> verify doc counts"); + assertHitCount(client().prepareSearch("restored-3").setSize(0).get(), expectedCount); + } + private void verifySnapshotInfo(final String repo, final GetSnapshotsResponse response, final Map> indicesPerSnapshot) { for (SnapshotInfo snapshotInfo : response.getSnapshots("test-repo")) { diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java index 7d59c71ed18ea..6059b44073442 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java @@ -77,7 +77,7 @@ private Entry randomSnapshot() { } ImmutableOpenMap shards = builder.build(); return new Entry(snapshot, includeGlobalState, partial, state, indices, startTime, repositoryStateId, shards, - SnapshotInfoTests.randomUserMetadata()); + SnapshotInfoTests.randomUserMetadata(), randomBoolean()); } @Override diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java index ee766ef7360b5..b5756c89377ea 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.repositories.ShardGenerations; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInfo; @@ -148,8 +149,9 @@ public void testOverwriteSnapshotInfoBlob() { // We create a snap- blob for snapshot "foo" in the first generation final PlainActionFuture future = PlainActionFuture.newFuture(); final SnapshotId snapshotId = new SnapshotId("foo", UUIDs.randomBase64UUID()); - repository.finalizeSnapshot(snapshotId, Collections.emptyList(), 1L, null, 5, Collections.emptyList(), - -1L, false, MetaData.EMPTY_META_DATA, Collections.emptyMap(), future); + // We try to write another snap- blob for "foo" in the next generation. It fails because the content differs. + repository.finalizeSnapshot(snapshotId, ShardGenerations.EMPTY, 1L, null, 5, Collections.emptyList(), + -1L, false, MetaData.EMPTY_META_DATA, Collections.emptyMap(), true, future); future.actionGet(); // We try to write another snap- blob for "foo" in the next generation. It fails because the content differs. @@ -157,8 +159,8 @@ public void testOverwriteSnapshotInfoBlob() { () -> { final PlainActionFuture fut = PlainActionFuture.newFuture(); repository.finalizeSnapshot( - snapshotId, Collections.emptyList(), 1L, null, 6, Collections.emptyList(), - 0, false, MetaData.EMPTY_META_DATA, Collections.emptyMap(), fut); + snapshotId, ShardGenerations.EMPTY, 1L, null, 6, Collections.emptyList(), + 0, false, MetaData.EMPTY_META_DATA, Collections.emptyMap(), true, fut); fut.actionGet(); }); assertThat(assertionError.getMessage(), equalTo("\nExpected: <6>\n but: was <5>")); @@ -166,8 +168,8 @@ public void testOverwriteSnapshotInfoBlob() { // We try to write yet another snap- blob for "foo" in the next generation. // It passes cleanly because the content of the blob except for the timestamps. final PlainActionFuture future2 = PlainActionFuture.newFuture(); - repository.finalizeSnapshot(snapshotId, Collections.emptyList(), 1L, null, 5, Collections.emptyList(), - 0, false, MetaData.EMPTY_META_DATA, Collections.emptyMap(), future2); + repository.finalizeSnapshot(snapshotId, ShardGenerations.EMPTY, 1L, null, 5, Collections.emptyList(), + 0, false, MetaData.EMPTY_META_DATA, Collections.emptyMap(),true, future2); future2.actionGet(); } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 5824e35e41bb1..98e2f88292d56 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -822,12 +822,13 @@ protected String snapshotShard(final IndexShard shard, final Repository repository) throws IOException { final Index index = shard.shardId().getIndex(); final IndexId indexId = new IndexId(index.getName(), index.getUUID()); - final IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(null); + final IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing( + repository.getRepositoryData().shardGenerations().getShardGen(indexId, shard.shardId().getId())); final PlainActionFuture future = PlainActionFuture.newFuture(); final String shardGen; try (Engine.IndexCommitRef indexCommitRef = shard.acquireLastIndexCommit(true)) { repository.snapshotShard(shard.store(), shard.mapperService(), snapshot.getSnapshotId(), indexId, - indexCommitRef.getIndexCommit(), snapshotStatus, future); + indexCommitRef.getIndexCommit(), snapshotStatus, true, future); shardGen = future.actionGet(); } diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java index 6891ab1385cbe..22bacff49f022 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java @@ -31,16 +31,15 @@ import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; +import org.elasticsearch.repositories.ShardGenerations; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotShardFailure; import java.io.IOException; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import static java.util.Collections.emptySet; import static org.elasticsearch.repositories.RepositoryData.EMPTY_REPO_GEN; @@ -87,21 +86,21 @@ public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId ind @Override public RepositoryData getRepositoryData() { - Map> map = new HashMap<>(); - map.put(new IndexId(indexName, "blah"), emptySet()); - return new RepositoryData(EMPTY_REPO_GEN, Collections.emptyMap(), Collections.emptyMap(), map); + final IndexId indexId = new IndexId(indexName, "blah"); + return new RepositoryData(EMPTY_REPO_GEN, Collections.emptyMap(), Collections.emptyMap(), + Collections.singletonMap(indexId, emptySet()), ShardGenerations.EMPTY); } @Override - public void finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, + public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, int totalShards, List shardFailures, long repositoryStateId, - boolean includeGlobalState, MetaData metaData, Map userMetadata, + boolean includeGlobalState, MetaData metaData, Map userMetadata, boolean writeShardGens, ActionListener listener) { listener.onResponse(null); } @Override - public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener listener) { + public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener listener) { listener.onResponse(null); } @@ -131,7 +130,8 @@ public boolean isReadOnly() { @Override public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, - IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener listener) { + IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens, + ActionListener listener) { } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java index 0438d940bbdaf..12b926f93a059 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java @@ -20,6 +20,7 @@ import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Strings; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetaData; @@ -33,6 +34,7 @@ import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.RepositoryData; +import org.elasticsearch.repositories.ShardGenerations; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.test.InternalTestCluster; @@ -45,11 +47,12 @@ import java.nio.file.NoSuchFileException; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; -import java.util.Locale; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @@ -59,6 +62,7 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -106,6 +110,7 @@ public static void assertConsistency(BlobStoreRepository repository, Executor ex } assertIndexUUIDs(blobContainer, repositoryData); assertSnapshotUUIDs(repository, repositoryData); + assertShardIndexGenerations(blobContainer, repositoryData.shardGenerations()); })); listener.actionGet(TimeValue.timeValueMinutes(1L)); } @@ -118,6 +123,27 @@ private static void assertIndexGenerations(BlobContainer repoRoot, long latestGe assertTrue(indexGenerations.length <= 2); } + private static void assertShardIndexGenerations(BlobContainer repoRoot, ShardGenerations shardGenerations) throws IOException { + final BlobContainer indicesContainer = repoRoot.children().get("indices"); + for (IndexId index : shardGenerations.indices()) { + final List gens = shardGenerations.getGens(index); + if (gens.isEmpty() == false) { + final BlobContainer indexContainer = indicesContainer.children().get(index.getId()); + final Map shardContainers = indexContainer.children(); + for (int i = 0; i < gens.size(); i++) { + final String generation = gens.get(i); + assertThat(generation, not(ShardGenerations.DELETED_SHARD_GEN)); + if (generation != null && generation.equals(ShardGenerations.NEW_SHARD_GEN) == false) { + final String shardId = Integer.toString(i); + assertThat(shardContainers, hasKey(shardId)); + assertThat(shardContainers.get(shardId).listBlobsByPrefix(BlobStoreRepository.INDEX_FILE_PREFIX), + hasKey(BlobStoreRepository.INDEX_FILE_PREFIX + generation)); + } + } + } + } + } + private static void assertIndexUUIDs(BlobContainer repoRoot, RepositoryData repositoryData) throws IOException { final List expectedIndexUUIDs = repositoryData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toList()); @@ -151,6 +177,8 @@ private static void assertSnapshotUUIDs(BlobStoreRepository repository, Reposito } else { indices = indicesContainer.children(); } + final Map maxShardCountsExpected = new HashMap<>(); + final Map maxShardCountsSeen = new HashMap<>(); // Assert that for each snapshot, the relevant metadata was written to index and shard folders for (SnapshotId snapshotId: snapshotIds) { final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotId); @@ -160,14 +188,27 @@ private static void assertSnapshotUUIDs(BlobStoreRepository repository, Reposito final BlobContainer indexContainer = indices.get(indexId.getId()); assertThat(indexContainer.listBlobs(), hasKey(String.format(Locale.ROOT, BlobStoreRepository.METADATA_NAME_FORMAT, snapshotId.getUUID()))); + final IndexMetaData indexMetaData = repository.getSnapshotIndexMetaData(snapshotId, indexId); for (Map.Entry entry : indexContainer.children().entrySet()) { // Skip Lucene MockFS extraN directory if (entry.getKey().startsWith("extra")) { continue; } - if (snapshotInfo.shardFailures().stream().noneMatch(shardFailure -> - shardFailure.index().equals(index) && shardFailure.shardId() == Integer.parseInt(entry.getKey()))) { - final Map shardPathContents = entry.getValue().listBlobs(); + final int shardId = Integer.parseInt(entry.getKey()); + final int shardCount = indexMetaData.getNumberOfShards(); + maxShardCountsExpected.compute( + indexId, (i, existing) -> existing == null || existing < shardCount ? shardCount : existing); + final BlobContainer shardContainer = entry.getValue(); + // TODO: we shouldn't be leaking empty shard directories when a shard (but not all of the index it belongs to) + // becomes unreferenced. We should fix that and remove this conditional once its fixed. + if (shardContainer.listBlobs().keySet().stream().anyMatch(blob -> blob.startsWith("extra") == false)) { + final int impliedCount = shardId - 1; + maxShardCountsSeen.compute( + indexId, (i, existing) -> existing == null || existing < impliedCount ? impliedCount : existing); + } + if (shardId < shardCount && snapshotInfo.shardFailures().stream().noneMatch( + shardFailure -> shardFailure.index().equals(index) && shardFailure.shardId() == shardId)) { + final Map shardPathContents = shardContainer.listBlobs(); assertThat(shardPathContents, hasKey(String.format(Locale.ROOT, BlobStoreRepository.SNAPSHOT_NAME_FORMAT, snapshotId.getUUID()))); assertThat(shardPathContents.keySet().stream() @@ -176,6 +217,8 @@ private static void assertSnapshotUUIDs(BlobStoreRepository repository, Reposito } } } + maxShardCountsSeen.forEach(((indexId, count) -> assertThat("Found unreferenced shard paths for index [" + indexId + "]", + count, lessThanOrEqualTo(maxShardCountsExpected.get(indexId))))); } public static long createDanglingIndex(BlobStoreRepository repository, String name, Set files) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 12c8e72a0cb02..5ec68fb604bc7 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -58,6 +58,7 @@ import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; +import org.elasticsearch.repositories.ShardGenerations; import org.elasticsearch.repositories.blobstore.FileRestoreContext; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInfo; @@ -241,19 +242,19 @@ public RepositoryData getRepositoryData() { Index index = remoteIndices.get(indexName).getIndex(); indexSnapshots.put(new IndexId(indexName, index.getUUID()), Collections.singleton(snapshotId)); } - - return new RepositoryData(1, copiedSnapshotIds, snapshotStates, indexSnapshots); + return new RepositoryData(1, copiedSnapshotIds, snapshotStates, indexSnapshots, ShardGenerations.EMPTY); } @Override - public void finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, + public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, int totalShards, List shardFailures, long repositoryStateId, boolean includeGlobalState, - MetaData metaData, Map userMetadata, ActionListener listener) { + MetaData metaData, Map userMetadata, boolean writeShardGens, + ActionListener listener) { throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); } @Override - public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, ActionListener listener) { + public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener listener) { throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); } @@ -288,7 +289,8 @@ public boolean isReadOnly() { @Override public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, - IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener listener) { + IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens, + ActionListener listener) { throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java index aa72a9fc0031b..35f71a6c7f540 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java @@ -36,12 +36,14 @@ import org.elasticsearch.repositories.FilterRepository; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.Repository; +import org.elasticsearch.repositories.ShardGenerations; import java.io.Closeable; import java.io.IOException; import java.io.UncheckedIOException; import java.nio.file.Path; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -78,21 +80,22 @@ public final class SourceOnlySnapshotRepository extends FilterRepository { } @Override - public void finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, - List shardFailures, long repositoryStateId, boolean includeGlobalState, - MetaData metaData, Map userMetadata, ActionListener listener) { + public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, + int totalShards, List shardFailures, long repositoryStateId, + boolean includeGlobalState, MetaData metaData, Map userMetadata, + boolean writeShardGens, ActionListener listener) { // we process the index metadata at snapshot time. This means if somebody tries to restore // a _source only snapshot with a plain repository it will be just fine since we already set the // required engine, that the index is read-only and the mapping to a default mapping try { - super.finalizeSnapshot(snapshotId, indices, startTime, failure, totalShards, shardFailures, repositoryStateId, - includeGlobalState, metadataToSnapshot(indices, metaData), userMetadata, listener); + super.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures, repositoryStateId, + includeGlobalState, metadataToSnapshot(shardGenerations.indices(), metaData), userMetadata, writeShardGens, listener); } catch (IOException ex) { listener.onFailure(ex); } } - private static MetaData metadataToSnapshot(List indices, MetaData metaData) throws IOException { + private static MetaData metadataToSnapshot(Collection indices, MetaData metaData) throws IOException { MetaData.Builder builder = MetaData.builder(metaData); for (IndexId indexId : indices) { IndexMetaData index = metaData.index(indexId.getName()); @@ -121,7 +124,8 @@ private static MetaData metadataToSnapshot(List indices, MetaData metaD @Override public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, - IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener listener) { + IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens, + ActionListener listener) { if (mapperService.documentMapper() != null // if there is no mapping this is null && mapperService.documentMapper().sourceMapper().isComplete() == false) { listener.onFailure( @@ -160,7 +164,7 @@ protected void closeInternal() { Collections.singletonMap(BlockTreeTermsReader.FST_MODE_KEY, BlockTreeTermsReader.FSTLoadMode.OFF_HEAP.name())); toClose.add(reader); IndexCommit indexCommit = reader.getIndexCommit(); - super.snapshotShard(tempStore, mapperService, snapshotId, indexId, indexCommit, snapshotStatus, + super.snapshotShard(tempStore, mapperService, snapshotId, indexId, indexCommit, snapshotStatus, writeShardGens, ActionListener.runBefore(listener, () -> IOUtils.close(toClose))); } catch (IOException e) { try { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java index 2cc5b27ae596b..c4ccf6bfd832b 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java @@ -60,6 +60,7 @@ import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.Repository; +import org.elasticsearch.repositories.ShardGenerations; import org.elasticsearch.repositories.fs.FsRepository; import org.elasticsearch.threadpool.ThreadPool; import org.hamcrest.Matchers; @@ -99,7 +100,7 @@ public void testSourceIncomplete() throws IOException { IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing("-1"); final PlainActionFuture future = PlainActionFuture.newFuture(); runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, - snapshotRef.getIndexCommit(), indexShardSnapshotStatus, future)); + snapshotRef.getIndexCommit(), indexShardSnapshotStatus, true, future)); IllegalStateException illegalStateException = expectThrows(IllegalStateException.class, future::actionGet); assertEquals( "Can't snapshot _source only on an index that has incomplete source ie. has _source disabled or filters the source", @@ -125,7 +126,7 @@ public void testIncrementalSnapshot() throws IOException { SnapshotId snapshotId = new SnapshotId("test", "test"); final PlainActionFuture future = PlainActionFuture.newFuture(); runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, - snapshotRef.getIndexCommit(), indexShardSnapshotStatus, future)); + snapshotRef.getIndexCommit(), indexShardSnapshotStatus, true, future)); shardGeneration = future.actionGet(); IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy(); assertEquals(copy.getTotalFileCount(), copy.getIncrementalFileCount()); @@ -141,7 +142,7 @@ public void testIncrementalSnapshot() throws IOException { IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(shardGeneration); final PlainActionFuture future = PlainActionFuture.newFuture(); runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, - snapshotRef.getIndexCommit(), indexShardSnapshotStatus, future)); + snapshotRef.getIndexCommit(), indexShardSnapshotStatus, true, future)); shardGeneration = future.actionGet(); IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy(); // we processed the segments_N file plus _1.si, _1.fdx, _1.fnm, _1.fdt @@ -157,7 +158,7 @@ public void testIncrementalSnapshot() throws IOException { IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(shardGeneration); final PlainActionFuture future = PlainActionFuture.newFuture(); runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, - snapshotRef.getIndexCommit(), indexShardSnapshotStatus, future)); + snapshotRef.getIndexCommit(), indexShardSnapshotStatus, true, future)); future.actionGet(); IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy(); // we processed the segments_N file plus _1_1.liv @@ -205,13 +206,15 @@ public void testRestoreMinmal() throws IOException { final PlainActionFuture future = PlainActionFuture.newFuture(); runAsSnapshot(shard.getThreadPool(), () -> { repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, snapshotRef.getIndexCommit(), - indexShardSnapshotStatus, future); + indexShardSnapshotStatus, true, future); future.actionGet(); final PlainActionFuture finFuture = PlainActionFuture.newFuture(); - repository.finalizeSnapshot(snapshotId, Collections.singletonList(indexId), + repository.finalizeSnapshot(snapshotId, + ShardGenerations.builder().put(indexId, 0, indexShardSnapshotStatus.generation()).build(), indexShardSnapshotStatus.asCopy().getStartTime(), null, 1, Collections.emptyList(), repository.getRepositoryData().getGenId(), true, MetaData.builder().put(shard.indexSettings().getIndexMetaData(), false).build(), Collections.emptyMap(), + true, finFuture); finFuture.actionGet(); }); diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java index dff01f8e9b0bb..8dea5e2b6adbd 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java @@ -334,7 +334,8 @@ public void testOkToDeleteSnapshots() { new SnapshotsInProgress.Entry( snapshot, true, false, SnapshotsInProgress.State.INIT, Collections.singletonList(new IndexId("name", "id")), 0, 0, - ImmutableOpenMap.builder().build(), Collections.emptyMap())); + ImmutableOpenMap.builder().build(), Collections.emptyMap(), + randomBoolean())); ClusterState state = ClusterState.builder(new ClusterName("cluster")) .putCustom(SnapshotsInProgress.TYPE, inProgress) .build();