From 56eaf66775f5369410395ca21c90a44ec9428e9b Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Fri, 5 Jun 2020 19:16:41 +0200 Subject: [PATCH] Deduplicate Index Metadata in BlobStore (#50278) This PR introduces two new fields in to `RepositoryData` (index-N) to track the blob name of `IndexMetaData` blobs and their content via setting generations and uuids. This is used to deduplicate the `IndexMetaData` blobs (`meta-{uuid}.dat` in the indices folders under `/indices` so that new metadata for an index is only written to the repository during a snapshot if that same metadata can't be found in another snapshot. This saves one write per index in the common case of unchanged metadata thus saving cost and making snapshot finalization drastically faster if many indices are being snapshotted at the same time. The implementation is mostly analogous to that for shard generations in #46250 and piggy backs on the BwC mechanism introduced in that PR (which means this PR needs adjustments if it doesn't go into `7.6`). Relates to #45736 as it improves the efficiency of snapshotting unchanged indices Relates to #49800 as it has the potential of loading the index metadata for multiple snapshots of the same index concurrently much more efficient speeding up future concurrent snapshot delete --- .../s3/S3BlobStoreRepositoryTests.java | 4 +- .../MultiVersionRepositoryAccessIT.java | 2 +- .../CorruptedBlobStoreRepositoryIT.java | 13 +- .../DedicatedClusterSnapshotRestoreIT.java | 83 ++++++++ ...etadataLoadingDuringSnapshotRestoreIT.java | 7 +- .../SharedClusterSnapshotRestoreIT.java | 3 +- .../TransportSnapshotsStatusAction.java | 2 +- .../repositories/FilterRepository.java | 4 +- .../IndexMetaDataGenerations.java | 177 ++++++++++++++++ .../repositories/Repository.java | 3 +- .../repositories/RepositoryData.java | 120 +++++++++-- .../blobstore/BlobStoreRepository.java | 189 +++++++++++------- .../snapshots/RestoreService.java | 2 +- .../snapshots/SnapshotsService.java | 13 +- .../RepositoriesServiceTests.java | 3 +- .../repositories/RepositoryDataTests.java | 95 +++++++-- .../blobstore/BlobStoreRepositoryTests.java | 13 +- .../snapshots/SnapshotResiliencyTests.java | 2 +- .../index/shard/RestoreOnlyRepository.java | 6 +- .../blobstore/BlobStoreTestUtil.java | 27 ++- .../AbstractSnapshotIntegTestCase.java | 6 +- .../xpack/ccr/repository/CcrRepository.java | 6 +- .../SourceOnlySnapshotShardTests.java | 3 +- 23 files changed, 644 insertions(+), 139 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/repositories/IndexMetaDataGenerations.java diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java index e240485fe625..fb8215294e17 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -160,8 +160,8 @@ public void testEnforcedCooldownPeriod() throws IOException { final RepositoryData repositoryData = getRepositoryData(repository); final RepositoryData modifiedRepositoryData = repositoryData.withVersions(Collections.singletonMap(fakeOldSnapshot, SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION.minimumCompatibilityVersion())); - final BytesReference serialized = - BytesReference.bytes(modifiedRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), false)); + final BytesReference serialized = BytesReference.bytes(modifiedRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), + SnapshotsService.OLD_SNAPSHOT_FORMAT)); PlainActionFuture.get(f -> repository.threadPool().generic().execute(ActionRunnable.run(f, () -> { try (InputStream stream = serialized.streamInput()) { repository.blobStore().blobContainer(repository.basePath()).writeBlobAtomic( diff --git a/qa/repository-multi-version/src/test/java/org/elasticsearch/upgrades/MultiVersionRepositoryAccessIT.java b/qa/repository-multi-version/src/test/java/org/elasticsearch/upgrades/MultiVersionRepositoryAccessIT.java index f3b02c71578b..616b19345c24 100644 --- a/qa/repository-multi-version/src/test/java/org/elasticsearch/upgrades/MultiVersionRepositoryAccessIT.java +++ b/qa/repository-multi-version/src/test/java/org/elasticsearch/upgrades/MultiVersionRepositoryAccessIT.java @@ -218,7 +218,7 @@ public void testUpgradeMovesRepoToNewMetaVersion() throws IOException { ensureSnapshotRestoreWorks(repoName, "snapshot-2", shards); } } else { - if (SnapshotsService.useShardGenerations(minimumNodeVersion()) == false) { + if (SnapshotsService.useIndexGenerations(minimumNodeVersion()) == false) { assertThat(TEST_STEP, is(TestStep.STEP3_OLD_CLUSTER)); final List> expectedExceptions = Arrays.asList(ResponseException.class, ElasticsearchStatusException.class); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java index d4f3d117732f..c12c0b5d17c6 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java @@ -33,6 +33,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.IndexMetaDataGenerations; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; @@ -273,11 +274,12 @@ public void testHandlingMissingRootLevelSnapshotMetadata() throws Exception { SnapshotId::getUUID, Function.identity())), repositoryData.getSnapshotIds().stream().collect(Collectors.toMap( SnapshotId::getUUID, repositoryData::getSnapshotState)), - Collections.emptyMap(), Collections.emptyMap(), ShardGenerations.EMPTY); + Collections.emptyMap(), Collections.emptyMap(), ShardGenerations.EMPTY, IndexMetaDataGenerations.EMPTY); Files.write(repo.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + withoutVersions.getGenId()), - BytesReference.toBytes(BytesReference.bytes(withoutVersions.snapshotsToXContent(XContentFactory.jsonBuilder(), - true))), StandardOpenOption.TRUNCATE_EXISTING); + BytesReference.toBytes(BytesReference.bytes( + withoutVersions.snapshotsToXContent(XContentFactory.jsonBuilder(), Version.CURRENT))), + StandardOpenOption.TRUNCATE_EXISTING); logger.info("--> verify that repo is assumed in old metadata format"); final SnapshotsService snapshotsService = internalCluster().getCurrentMasterNodeInstance(SnapshotsService.class); @@ -403,11 +405,12 @@ public void testRepairBrokenShardGenerations() throws IOException { Collectors.toMap(SnapshotId::getUUID, repositoryData1::getVersion)), repositoryData1.getIndices().values().stream().collect( Collectors.toMap(Function.identity(), repositoryData1::getSnapshots) - ), ShardGenerations.builder().putAll(repositoryData1.shardGenerations()).put(indexId, 0, "0").build() + ), ShardGenerations.builder().putAll(repositoryData1.shardGenerations()).put(indexId, 0, "0").build(), + repositoryData1.indexMetaDataGenerations() ); Files.write(repoPath.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + repositoryData1.getGenId()), BytesReference.toBytes(BytesReference.bytes( - brokenRepoData.snapshotsToXContent(XContentFactory.jsonBuilder(), true))), + brokenRepoData.snapshotsToXContent(XContentFactory.jsonBuilder(), Version.CURRENT))), StandardOpenOption.TRUNCATE_EXISTING); logger.info("--> recreating repository to clear caches"); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index 7e33d11d1dba..71ce2cfe4f4c 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -73,9 +73,11 @@ import org.elasticsearch.node.Node; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.repositories.RepositoryMissingException; +import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.rest.AbstractRestChannel; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestResponse; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.action.admin.cluster.RestClusterStateAction; import org.elasticsearch.rest.action.admin.cluster.RestGetRepositoriesAction; import org.elasticsearch.snapshots.mockstore.MockRepository; @@ -996,6 +998,8 @@ public void testSnapshotTotalAndIncrementalSizes() throws IOException { SnapshotStats stats = snapshots.get(0).getStats(); + final List snapshot0IndexMetaFiles = findRepoMetaBlobs(repoPath); + assertThat(snapshot0IndexMetaFiles, hasSize(1)); // snapshotting a single index assertThat(stats.getTotalFileCount(), greaterThanOrEqualTo(snapshot0FileCount)); assertThat(stats.getTotalSize(), greaterThanOrEqualTo(snapshot0FileSize)); @@ -1023,6 +1027,10 @@ public void testSnapshotTotalAndIncrementalSizes() throws IOException { .get(); final List snapshot1Files = scanSnapshotFolder(repoPath); + final List snapshot1IndexMetaFiles = findRepoMetaBlobs(repoPath); + + // The IndexMetadata did not change between snapshots, verify that no new redundant IndexMetaData was written to the repository + assertThat(snapshot1IndexMetaFiles, is(snapshot0IndexMetaFiles)); final int snapshot1FileCount = snapshot1Files.size(); final long snapshot1FileSize = calculateTotalFilesSize(snapshot1Files); @@ -1047,6 +1055,65 @@ public void testSnapshotTotalAndIncrementalSizes() throws IOException { assertThat(anotherStats.getTotalSize(), greaterThanOrEqualTo(snapshot1FileSize)); } + public void testDeduplicateIndexMetadata() throws Exception { + final String indexName = "test-blocks-1"; + final String repositoryName = "repo-" + indexName; + final String snapshot0 = "snapshot-0"; + final String snapshot1 = "snapshot-1"; + final String snapshot2 = "snapshot-2"; + + createIndex(indexName); + + int docs = between(10, 100); + for (int i = 0; i < docs; i++) { + client().prepareIndex(indexName, "_doc").setSource("test", "init").execute().actionGet(); + } + + final Path repoPath = randomRepoPath(); + createRepository(repositoryName, "fs", repoPath); + + logger.info("--> create a snapshot"); + client().admin().cluster().prepareCreateSnapshot(repositoryName, snapshot0) + .setIncludeGlobalState(true) + .setWaitForCompletion(true) + .get(); + + final List snapshot0IndexMetaFiles = findRepoMetaBlobs(repoPath); + assertThat(snapshot0IndexMetaFiles, hasSize(1)); // snapshotting a single index + + docs = between(1, 5); + for (int i = 0; i < docs; i++) { + client().prepareIndex(indexName, "_doc").setSource("test", "test" + i).execute().actionGet(); + } + + logger.info("--> restart random data node and add new data node to change index allocation"); + internalCluster().restartRandomDataNode(); + internalCluster().startDataOnlyNode(); + ensureGreen(indexName); + + assertThat(client().admin().cluster().prepareCreateSnapshot(repositoryName, snapshot1).setWaitForCompletion(true).get().status(), + equalTo(RestStatus.OK)); + + final List snapshot1IndexMetaFiles = findRepoMetaBlobs(repoPath); + + // The IndexMetadata did not change between snapshots, verify that no new redundant IndexMetaData was written to the repository + assertThat(snapshot1IndexMetaFiles, is(snapshot0IndexMetaFiles)); + + // index to some other field to trigger a change in index metadata + for (int i = 0; i < docs; i++) { + client().prepareIndex(indexName, "_doc").setSource("new_field", "test" + i).execute().actionGet(); + } + assertThat(client().admin().cluster().prepareCreateSnapshot(repositoryName, snapshot2).setWaitForCompletion(true).get().status(), + equalTo(RestStatus.OK)); + + final List snapshot2IndexMetaFiles = findRepoMetaBlobs(repoPath); + assertThat(snapshot2IndexMetaFiles, hasSize(2)); // should have created one new metadata blob + + assertAcked(client().admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot0, snapshot1).get()); + final List snapshot3IndexMetaFiles = findRepoMetaBlobs(repoPath); + assertThat(snapshot3IndexMetaFiles, hasSize(1)); // should have deleted the metadata blob referenced by the first two snapshots + } + public void testDataNodeRestartWithBusyMasterDuringSnapshot() throws Exception { logger.info("--> starting a master node and two data nodes"); internalCluster().startMasterOnlyNode(); @@ -1256,6 +1323,22 @@ private long calculateTotalFilesSize(List files) { }).sum(); } + private static List findRepoMetaBlobs(Path repoPath) throws IOException { + List files = new ArrayList<>(); + Files.walkFileTree(repoPath.resolve("indices"), new SimpleFileVisitor() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { + final String fileName = file.getFileName().toString(); + if (fileName.startsWith(BlobStoreRepository.METADATA_PREFIX) && fileName.endsWith(".dat")) { + files.add(file); + } + return super.visitFile(file, attrs); + } + } + ); + return files; + } + private List scanSnapshotFolder(Path repoPath) throws IOException { List files = new ArrayList<>(); Files.walkFileTree(repoPath, new SimpleFileVisitor(){ diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/MetadataLoadingDuringSnapshotRestoreIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/MetadataLoadingDuringSnapshotRestoreIT.java index c12a0b9b7648..2850eb777dca 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/MetadataLoadingDuringSnapshotRestoreIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/MetadataLoadingDuringSnapshotRestoreIT.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; @@ -34,6 +35,7 @@ import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; +import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.snapshots.mockstore.MockRepository; @@ -198,9 +200,10 @@ public Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId) { } @Override - public IndexMetadata getSnapshotIndexMetadata(SnapshotId snapshotId, IndexId indexId) throws IOException { + public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, SnapshotId snapshotId, + IndexId indexId) throws IOException { indicesMetadata.computeIfAbsent(key(snapshotId.getName(), indexId.getName()), (s) -> new AtomicInteger(0)).incrementAndGet(); - return super.getSnapshotIndexMetadata(snapshotId, indexId); + return super.getSnapshotIndexMetaData(PlainActionFuture.get(this::getRepositoryData), snapshotId, indexId); } } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index 2666b2434198..7234c192209f 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -2546,7 +2546,8 @@ public void testRestoreSnapshotWithCorruptedIndexMetadata() throws Exception { final IndexId corruptedIndex = randomFrom(indexIds.values()); final Path indexMetadataPath = repo.resolve("indices") .resolve(corruptedIndex.getId()) - .resolve("meta-" + snapshotInfo.snapshotId().getUUID() + ".dat"); + .resolve( + "meta-" + repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotInfo.snapshotId(), corruptedIndex) + ".dat"); // Truncate the index metadata file try(SeekableByteChannel outChan = Files.newByteChannel(indexMetadataPath, StandardOpenOption.WRITE)) { diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java index c9c4a4696389..e2dea018b4ff 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java @@ -334,7 +334,7 @@ private Map snapshotShards(final String repos final Map shardStatus = new HashMap<>(); for (String index : snapshotInfo.indices()) { IndexId indexId = repositoryData.resolveIndexId(index); - IndexMetadata indexMetadata = repository.getSnapshotIndexMetadata(snapshotInfo.snapshotId(), indexId); + IndexMetadata indexMetadata = repository.getSnapshotIndexMetaData(repositoryData, snapshotInfo.snapshotId(), indexId); if (indexMetadata != null) { int numberOfShards = indexMetadata.getNumberOfShards(); for (int i = 0; i < numberOfShards; i++) { diff --git a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java index b6ac4958975c..588b85f23d59 100644 --- a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java @@ -70,8 +70,8 @@ public Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId) { } @Override - public IndexMetadata getSnapshotIndexMetadata(SnapshotId snapshotId, IndexId index) throws IOException { - return in.getSnapshotIndexMetadata(snapshotId, index); + public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, SnapshotId snapshotId, IndexId index) throws IOException { + return in.getSnapshotIndexMetaData(repositoryData, snapshotId, index); } @Override diff --git a/server/src/main/java/org/elasticsearch/repositories/IndexMetaDataGenerations.java b/server/src/main/java/org/elasticsearch/repositories/IndexMetaDataGenerations.java new file mode 100644 index 000000000000..bc1b6ae8b436 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/repositories/IndexMetaDataGenerations.java @@ -0,0 +1,177 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.repositories; + +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.snapshots.SnapshotId; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * Tracks the blob uuids of blobs containing {@link IndexMetadata} for snapshots as well an identifier for each of these blobs. + * Before writing a new {@link IndexMetadata} blob during snapshot finalization in + * {@link org.elasticsearch.repositories.blobstore.BlobStoreRepository#finalizeSnapshot} the identifier for an instance of + * {@link IndexMetadata} should be computed and then used to check if it already exists in the repository via + * {@link #getIndexMetaBlobId(String)}. + */ +public final class IndexMetaDataGenerations { + + public static final IndexMetaDataGenerations EMPTY = new IndexMetaDataGenerations(Collections.emptyMap(), Collections.emptyMap()); + + /** + * Map of {@link SnapshotId} to a map of the indices in a snapshot mapping {@link IndexId} to metadata identifiers. + * The identifiers in the nested map can be mapped to the relevant blob uuid via {@link #getIndexMetaBlobId}. + */ + final Map> lookup; + + /** + * Map of index metadata identifier to blob uuid. + */ + final Map identifiers; + + IndexMetaDataGenerations(Map> lookup, Map identifiers) { + assert identifiers.keySet().equals(lookup.values().stream().flatMap(m -> m.values().stream()).collect(Collectors.toSet())) : + "identifier mappings " + identifiers + " don't track the same blob ids as the lookup map " + lookup; + assert lookup.values().stream().noneMatch(Map::isEmpty) : "Lookup contained empty map [" + lookup + "]"; + this.lookup = Collections.unmodifiableMap(lookup); + this.identifiers = Collections.unmodifiableMap(identifiers); + } + + public boolean isEmpty() { + return identifiers.isEmpty(); + } + + /** + * Gets the blob id by the identifier of {@link org.elasticsearch.cluster.metadata.IndexMetadata} + * (computed via {@link #buildUniqueIdentifier}) or {@code null} if none is tracked for the identifier. + * + * @param metaIdentifier identifier for {@link IndexMetadata} + * @return blob id for the given metadata identifier or {@code null} if the identifier is not part of the repository yet + */ + @Nullable + public String getIndexMetaBlobId(String metaIdentifier) { + return identifiers.get(metaIdentifier); + } + + /** + * Get the blob id by {@link SnapshotId} and {@link IndexId} and fall back to the value of {@link SnapshotId#getUUID()} if none is + * known to enable backwards compatibility with versions older than + * {@link org.elasticsearch.snapshots.SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION} which used the snapshot uuid as index metadata + * blob uuid. + * + * @param snapshotId Snapshot Id + * @param indexId Index Id + * @return blob id for the given index metadata + */ + public String indexMetaBlobId(SnapshotId snapshotId, IndexId indexId) { + final String identifier = lookup.getOrDefault(snapshotId, Collections.emptyMap()).get(indexId); + if (identifier == null) { + return snapshotId.getUUID(); + } else { + return identifiers.get(identifier); + } + } + + /** + * Create a new instance with the given snapshot and index metadata uuids and identifiers added. + * + * @param snapshotId SnapshotId + * @param newLookup new mappings of index + snapshot to index metadata identifier + * @param newIdentifiers new mappings of index metadata identifier to blob id + * @return instance with added snapshot + */ + public IndexMetaDataGenerations withAddedSnapshot(SnapshotId snapshotId, Map newLookup, + Map newIdentifiers) { + final Map> updatedIndexMetaLookup = new HashMap<>(this.lookup); + final Map updatedIndexMetaIdentifiers = new HashMap<>(identifiers); + updatedIndexMetaIdentifiers.putAll(newIdentifiers); + updatedIndexMetaLookup.compute(snapshotId, (snId, lookup) -> { + if (lookup == null) { + if (newLookup.isEmpty()) { + return null; + } + return Collections.unmodifiableMap(new HashMap<>(newLookup)); + } else { + final Map updated = new HashMap<>(lookup); + updated.putAll(newLookup); + return Collections.unmodifiableMap(updated); + } + }); + return new IndexMetaDataGenerations(updatedIndexMetaLookup, updatedIndexMetaIdentifiers); + } + + /** + * Create a new instance with the given snapshot removed. + * + * @param snapshotIds SnapshotIds to remove + * @return new instance without the given snapshot + */ + public IndexMetaDataGenerations withRemovedSnapshots(Collection snapshotIds) { + final Map> updatedIndexMetaLookup = new HashMap<>(lookup); + updatedIndexMetaLookup.keySet().removeAll(snapshotIds); + final Map updatedIndexMetaIdentifiers = new HashMap<>(identifiers); + updatedIndexMetaIdentifiers.keySet().removeIf( + k -> updatedIndexMetaLookup.values().stream().noneMatch(identifiers -> identifiers.containsValue(k))); + return new IndexMetaDataGenerations(updatedIndexMetaLookup, updatedIndexMetaIdentifiers); + } + + @Override + public int hashCode() { + return Objects.hash(identifiers, lookup); + } + + @Override + public boolean equals(Object that) { + if (this == that) { + return true; + } + if (that instanceof IndexMetaDataGenerations == false) { + return false; + } + final IndexMetaDataGenerations other = (IndexMetaDataGenerations) that; + return lookup.equals(other.lookup) && identifiers.equals(other.identifiers); + } + + @Override + public String toString() { + return "IndexMetaDataGenerations{lookup:" + lookup + "}{identifier:" + identifiers + "}"; + } + + /** + * Compute identifier for {@link IndexMetadata} from its index- and history-uuid as well as its settings-, mapping- and alias-version. + * If an index did not see a change in its settings, mappings or aliases between two points in time then the identifier will not change + * between them either. + * + * @param indexMetaData IndexMetaData + * @return identifier string + */ + public static String buildUniqueIdentifier(IndexMetadata indexMetaData) { + return indexMetaData.getIndexUUID() + + "-" + indexMetaData.getSettings().get(IndexMetadata.SETTING_HISTORY_UUID, IndexMetadata.INDEX_UUID_NA_VALUE) + + "-" + indexMetaData.getSettingsVersion() + "-" + indexMetaData.getMappingVersion() + + "-" + indexMetaData.getAliasesVersion(); + } +} diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java index 1fb3bca5fc69..727ee7eef104 100644 --- a/server/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java @@ -102,11 +102,12 @@ default Repository create(RepositoryMetadata metadata, Function snapshotVersions; + /** + * Index metadata generations. + */ + private final IndexMetaDataGenerations indexMetaDataGenerations; + /** * Shard generations. */ @@ -100,7 +106,7 @@ public final class RepositoryData { public RepositoryData(long genId, Map snapshotIds, Map snapshotStates, Map snapshotVersions, Map> indexSnapshots, - ShardGenerations shardGenerations) { + ShardGenerations shardGenerations, IndexMetaDataGenerations indexMetaDataGenerations) { this.genId = genId; this.snapshotIds = Collections.unmodifiableMap(snapshotIds); this.snapshotStates = Collections.unmodifiableMap(snapshotStates); @@ -108,6 +114,7 @@ public RepositoryData(long genId, Map snapshotIds, Map snapshotIds, Map versions) { } final Map newVersions = new HashMap<>(snapshotVersions); versions.forEach((id, version) -> newVersions.put(id.getUUID(), version)); - return new RepositoryData(genId, snapshotIds, snapshotStates, newVersions, indexSnapshots, shardGenerations); + return new RepositoryData( + genId, snapshotIds, snapshotStates, newVersions, indexSnapshots, shardGenerations, indexMetaDataGenerations); } public ShardGenerations shardGenerations() { @@ -198,6 +207,32 @@ public List indicesToUpdateAfterRemovingSnapshot(Collection }).map(Map.Entry::getKey).collect(Collectors.toList()); } + /** + * Returns a map of {@link IndexId} to a collection of {@link String} containing all the {@link IndexId} and the + * {@link org.elasticsearch.cluster.metadata.IndexMetadata} blob name in it that can be removed after removing the given snapshot from + * the repository. + * NOTE: Does not return a mapping for {@link IndexId} values that will be removed completely from the repository. + * + * @param snapshotIds SnapshotIds to remove + * @return map of index to index metadata blob id to delete + */ + public Map> indexMetaDataToRemoveAfterRemovingSnapshots(Collection snapshotIds) { + Collection indicesForSnapshot = indicesToUpdateAfterRemovingSnapshot(snapshotIds); + final Set allRemainingIdentifiers = indexMetaDataGenerations.lookup.entrySet().stream() + .filter(e -> snapshotIds.contains(e.getKey()) == false).flatMap(e -> e.getValue().values().stream()) + .map(indexMetaDataGenerations::getIndexMetaBlobId).collect(Collectors.toSet()); + final Map> toRemove = new HashMap<>(); + for (IndexId indexId : indicesForSnapshot) { + for (SnapshotId snapshotId : snapshotIds) { + final String identifier = indexMetaDataGenerations.indexMetaBlobId(snapshotId, indexId); + if (allRemainingIdentifiers.contains(identifier) == false) { + toRemove.computeIfAbsent(indexId, k -> new HashSet<>()).add(identifier); + } + } + } + return toRemove; + } + /** * Add a snapshot and its indices to the repository; returns a new instance. If the snapshot * already exists in the repository data, this method throws an IllegalArgumentException. @@ -206,11 +241,16 @@ public List indicesToUpdateAfterRemovingSnapshot(Collection * @param snapshotState State of the new snapshot * @param shardGenerations Updated shard generations in the new snapshot. For each index contained in the snapshot an array of new * generations indexed by the shard id they correspond to must be supplied. + * @param indexMetaBlobs Map of index metadata blob uuids + * @param newIdentifiers Map of new index metadata blob uuids keyed by the identifiers of the + * {@link org.elasticsearch.cluster.metadata.IndexMetadata} in them */ public RepositoryData addSnapshot(final SnapshotId snapshotId, final SnapshotState snapshotState, final Version version, - final ShardGenerations shardGenerations) { + final ShardGenerations shardGenerations, + @Nullable final Map indexMetaBlobs, + @Nullable final Map newIdentifiers) { if (snapshotIds.containsKey(snapshotId.getUUID())) { // if the snapshot id already exists in the repository data, it means an old master // that is blocked from the cluster is trying to finalize a snapshot concurrently with @@ -235,8 +275,23 @@ public RepositoryData addSnapshot(final SnapshotId snapshotId, allIndexSnapshots.put(indexId, Collections.unmodifiableList(copy)); } } + + final IndexMetaDataGenerations newIndexMetaGenerations; + if (indexMetaBlobs == null) { + assert newIdentifiers == null : "Non-null new identifiers [" + newIdentifiers + "] for null lookup"; + assert indexMetaDataGenerations.lookup.isEmpty() : + "Index meta generations should have been empty but was [" + indexMetaDataGenerations + "]"; + newIndexMetaGenerations = IndexMetaDataGenerations.EMPTY; + } else { + assert indexMetaBlobs.isEmpty() || shardGenerations.indices().equals(indexMetaBlobs.keySet()) : + "Shard generations contained indices " + shardGenerations.indices() + + " but indexMetaData was given for " + indexMetaBlobs.keySet(); + newIndexMetaGenerations = indexMetaDataGenerations.withAddedSnapshot(snapshotId, indexMetaBlobs, newIdentifiers); + } + return new RepositoryData(genId, snapshots, newSnapshotStates, newSnapshotVersions, allIndexSnapshots, - ShardGenerations.builder().putAll(this.shardGenerations).putAll(shardGenerations).build()); + ShardGenerations.builder().putAll(this.shardGenerations).putAll(shardGenerations).build(), + newIndexMetaGenerations); } /** @@ -249,7 +304,8 @@ public RepositoryData withGenId(long newGeneration) { if (newGeneration == genId) { return this; } - return new RepositoryData(newGeneration, snapshotIds, snapshotStates, snapshotVersions, indexSnapshots, shardGenerations); + return new RepositoryData( + newGeneration, snapshotIds, snapshotStates, snapshotVersions, indexSnapshots, shardGenerations, indexMetaDataGenerations); } /** @@ -291,7 +347,8 @@ public RepositoryData removeSnapshots(final Collection snapshots, fi return new RepositoryData(genId, newSnapshotIds, newSnapshotStates, newSnapshotVersions, indexSnapshots, ShardGenerations.builder().putAll(shardGenerations).putAll(updatedShardGenerations) - .retainIndicesAndPruneDeletes(indexSnapshots.keySet()).build() + .retainIndicesAndPruneDeletes(indexSnapshots.keySet()).build(), + indexMetaDataGenerations.withRemovedSnapshots(snapshots) ); } @@ -320,12 +377,14 @@ public boolean equals(Object obj) { && snapshotVersions.equals(that.snapshotVersions) && indices.equals(that.indices) && indexSnapshots.equals(that.indexSnapshots) - && shardGenerations.equals(that.shardGenerations); + && shardGenerations.equals(that.shardGenerations) + && indexMetaDataGenerations.equals(that.indexMetaDataGenerations); } @Override public int hashCode() { - return Objects.hash(snapshotIds, snapshotStates, snapshotVersions, indices, indexSnapshots, shardGenerations); + return Objects.hash( + snapshotIds, snapshotStates, snapshotVersions, indices, indexSnapshots, shardGenerations, indexMetaDataGenerations); } /** @@ -366,6 +425,8 @@ public List resolveNewIndices(final List indicesToResolve) { } private static final String SHARD_GENERATIONS = "shard_generations"; + private static final String INDEX_METADATA_IDENTIFIERS = "index_metadata_identifiers"; + private static final String INDEX_METADATA_LOOKUP = "index_metadata_lookup"; private static final String SNAPSHOTS = "snapshots"; private static final String INDICES = "indices"; private static final String INDEX_ID = "id"; @@ -378,10 +439,12 @@ public List resolveNewIndices(final List indicesToResolve) { /** * Writes the snapshots metadata and the related indices metadata to x-content. */ - public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final boolean shouldWriteShardGens) throws IOException { + public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final Version repoMetaVersion) throws IOException { builder.startObject(); // write the snapshots list builder.startArray(SNAPSHOTS); + final boolean shouldWriteIndexGens = SnapshotsService.useIndexGenerations(repoMetaVersion); + final boolean shouldWriteShardGens = SnapshotsService.useShardGenerations(repoMetaVersion); for (final SnapshotId snapshot : getSnapshotIds()) { builder.startObject(); builder.field(NAME, snapshot.getName()); @@ -389,6 +452,10 @@ public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final if (snapshotStates.containsKey(snapshot.getUUID())) { builder.field(STATE, snapshotStates.get(snapshot.getUUID()).value()); } + if (shouldWriteIndexGens) { + builder.field(INDEX_METADATA_LOOKUP, indexMetaDataGenerations.lookup.getOrDefault(snapshot, Collections.emptyMap()) + .entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey().getId(), Map.Entry::getValue))); + } if (snapshotVersions.containsKey(snapshot.getUUID())) { builder.field(VERSION, snapshotVersions.get(snapshot.getUUID()).toString()); } @@ -417,7 +484,10 @@ public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final builder.endObject(); } builder.endObject(); - if (shouldWriteShardGens) { + if (shouldWriteIndexGens) { + builder.field(MIN_VERSION, SnapshotsService.INDEX_GEN_IN_REPO_DATA_VERSION.toString()); + builder.field(INDEX_METADATA_IDENTIFIERS, indexMetaDataGenerations.identifiers); + } else if (shouldWriteShardGens) { // Add min version field to make it impossible for older ES versions to deserialize this object builder.field(MIN_VERSION, SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION.toString()); } @@ -425,6 +495,10 @@ public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final return builder; } + public IndexMetaDataGenerations indexMetaDataGenerations() { + return indexMetaDataGenerations; + } + /** * Reads an instance of {@link RepositoryData} from x-content, loading the snapshots and indices metadata. * @@ -438,6 +512,8 @@ public static RepositoryData snapshotsFromXContent(XContentParser parser, long g final Map snapshotVersions = new HashMap<>(); final Map> indexSnapshots = new HashMap<>(); final ShardGenerations.Builder shardGenerations = ShardGenerations.builder(); + final Map indexMetaIdentifiers = new HashMap<>(); + final Map> indexMetaLookup = new HashMap<>(); if (parser.nextToken() == XContentParser.Token.START_OBJECT) { while (parser.nextToken() == XContentParser.Token.FIELD_NAME) { @@ -448,6 +524,7 @@ public static RepositoryData snapshotsFromXContent(XContentParser parser, long g String name = null; String uuid = null; SnapshotState state = null; + Map metaGenerations = new HashMap<>(); Version version = null; while (parser.nextToken() != XContentParser.Token.END_OBJECT) { String currentFieldName = parser.currentName(); @@ -458,6 +535,8 @@ public static RepositoryData snapshotsFromXContent(XContentParser parser, long g uuid = parser.text(); } else if (STATE.equals(currentFieldName)) { state = SnapshotState.fromValue(parser.numberValue().byteValue()); + } else if (INDEX_METADATA_LOOKUP.equals(currentFieldName)) { + metaGenerations.putAll(parser.mapStrings()); } else if (VERSION.equals(currentFieldName)) { version = Version.fromString(parser.text()); } @@ -470,6 +549,9 @@ public static RepositoryData snapshotsFromXContent(XContentParser parser, long g snapshotVersions.put(uuid, version); } snapshots.put(snapshotId.getUUID(), snapshotId); + if (metaGenerations.isEmpty() == false) { + indexMetaLookup.put(snapshotId, metaGenerations); + } } } else { throw new ElasticsearchParseException("expected array for [" + field + "]"); @@ -545,6 +627,11 @@ public static RepositoryData snapshotsFromXContent(XContentParser parser, long g } } } + } else if (INDEX_METADATA_IDENTIFIERS.equals(field)) { + if (parser.nextToken() != XContentParser.Token.START_OBJECT) { + throw new ElasticsearchParseException("start object expected [" + INDEX_METADATA_IDENTIFIERS + "]"); + } + indexMetaIdentifiers.putAll(parser.mapStrings()); } else if (MIN_VERSION.equals(field)) { if (parser.nextToken() != XContentParser.Token.VALUE_STRING) { throw new ElasticsearchParseException("version string expected [min_version]"); @@ -558,7 +645,12 @@ public static RepositoryData snapshotsFromXContent(XContentParser parser, long g } else { throw new ElasticsearchParseException("start object expected"); } - return new RepositoryData(genId, snapshots, snapshotStates, snapshotVersions, indexSnapshots, shardGenerations.build()); + final Map indexLookup = + indexSnapshots.keySet().stream().collect(Collectors.toMap(IndexId::getId, Function.identity())); + return new RepositoryData(genId, snapshots, snapshotStates, snapshotVersions, indexSnapshots, shardGenerations.build(), + new IndexMetaDataGenerations(indexMetaLookup.entrySet().stream().collect( + Collectors.toMap(Map.Entry::getKey, e -> e.getValue().entrySet().stream() + .collect(Collectors.toMap(entry -> indexLookup.get(entry.getKey()), Map.Entry::getValue)))), indexMetaIdentifiers)); } } diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 89ccf6824d47..97ecfaf3a352 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -79,6 +79,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -100,6 +101,7 @@ import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.IndexMetaDataGenerations; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryCleanupResult; import org.elasticsearch.repositories.RepositoryData; @@ -579,7 +581,7 @@ protected void doRun() throws Exception { // delete an index that was created by another master node after writing this index-N blob. final Map foundIndices = blobStore().blobContainer(indicesPath()).children(); doDeleteShardSnapshots(snapshotIds, repositoryStateId, foundIndices, rootBlobs, repositoryData, - SnapshotsService.useShardGenerations(repositoryMetaVersion), listener); + repositoryMetaVersion, listener); } @Override @@ -639,10 +641,10 @@ private RepositoryData safeRepositoryData(long repositoryStateId, Map snapshotIds, long repositoryStateId, Map foundIndices, - Map rootBlobs, RepositoryData repositoryData, boolean writeShardGens, + Map rootBlobs, RepositoryData repositoryData, Version repoMetaVersion, ActionListener listener) { - if (writeShardGens) { + if (SnapshotsService.useShardGenerations(repoMetaVersion)) { // First write the new shard state metadata (with the removed snapshot) and compute deletion targets final StepListener> writeShardMetaDataAndComputeDeletesStep = new StepListener<>(); writeUpdatedShardMetaDataAndComputeDeletes(snapshotIds, repositoryData, true, writeShardMetaDataAndComputeDeletesStep); @@ -660,7 +662,7 @@ private void doDeleteShardSnapshots(Collection snapshotIds, long rep builder.put(newGen.indexId, newGen.shardId, newGen.newGeneration); } final RepositoryData updatedRepoData = repositoryData.removeSnapshots(snapshotIds, builder.build()); - writeIndexGen(updatedRepoData, repositoryStateId, true, Function.identity(), + writeIndexGen(updatedRepoData, repositoryStateId, repoMetaVersion, Function.identity(), ActionListener.wrap(v -> writeUpdatedRepoDataStep.onResponse(updatedRepoData), listener::onFailure)); }, listener::onFailure); // Once we have updated the repository, run the clean-ups @@ -669,12 +671,13 @@ private void doDeleteShardSnapshots(Collection snapshotIds, long rep final ActionListener afterCleanupsListener = new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2); asyncCleanupUnlinkedRootAndIndicesBlobs(snapshotIds, foundIndices, rootBlobs, updatedRepoData, afterCleanupsListener); - asyncCleanupUnlinkedShardLevelBlobs(snapshotIds, writeShardMetaDataAndComputeDeletesStep.result(), afterCleanupsListener); + asyncCleanupUnlinkedShardLevelBlobs(repositoryData, snapshotIds, writeShardMetaDataAndComputeDeletesStep.result(), + afterCleanupsListener); }, listener::onFailure); } else { // Write the new repository data first (with the removed snapshot), using no shard generations final RepositoryData updatedRepoData = repositoryData.removeSnapshots(snapshotIds, ShardGenerations.EMPTY); - writeIndexGen(updatedRepoData, repositoryStateId, false, Function.identity(), ActionListener.wrap(v -> { + writeIndexGen(updatedRepoData, repositoryStateId, repoMetaVersion, Function.identity(), ActionListener.wrap(v -> { // Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion final ActionListener afterCleanupsListener = new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2); @@ -682,7 +685,7 @@ private void doDeleteShardSnapshots(Collection snapshotIds, long rep final StepListener> writeMetaAndComputeDeletesStep = new StepListener<>(); writeUpdatedShardMetaDataAndComputeDeletes(snapshotIds, repositoryData, false, writeMetaAndComputeDeletesStep); writeMetaAndComputeDeletesStep.whenComplete(deleteResults -> - asyncCleanupUnlinkedShardLevelBlobs(snapshotIds, deleteResults, afterCleanupsListener), + asyncCleanupUnlinkedShardLevelBlobs(repositoryData, snapshotIds, deleteResults, afterCleanupsListener), afterCleanupsListener::onFailure); }, listener::onFailure)); } @@ -696,14 +699,14 @@ private void asyncCleanupUnlinkedRootAndIndicesBlobs(Collection dele l -> cleanupStaleBlobs(deletedSnapshots, foundIndices, rootBlobs, updatedRepoData, ActionListener.map(l, ignored -> null)))); } - private void asyncCleanupUnlinkedShardLevelBlobs(Collection snapshotIds, + private void asyncCleanupUnlinkedShardLevelBlobs(RepositoryData oldRepositoryData, Collection snapshotIds, Collection deleteResults, ActionListener listener) { threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap( listener, l -> { try { - deleteFromContainer(blobContainer(), resolveFilesToDelete(snapshotIds, deleteResults)); + deleteFromContainer(blobContainer(), resolveFilesToDelete(oldRepositoryData, snapshotIds, deleteResults)); l.onResponse(null); } catch (Exception e) { logger.warn( @@ -735,14 +738,18 @@ private void writeUpdatedShardMetaDataAndComputeDeletes(Collection s final Set survivingSnapshots = oldRepositoryData.getSnapshots(indexId).stream() .filter(id -> snapshotIds.contains(id) == false).collect(Collectors.toSet()); final StepListener> shardCountListener = new StepListener<>(); - final ActionListener allShardCountsListener = new GroupedActionListener<>(shardCountListener, snapshotIds.size()); - for (SnapshotId snapshotId : snapshotIds) { + final Collection indexMetaGenerations = snapshotIds.stream().map( + id -> oldRepositoryData.indexMetaDataGenerations().indexMetaBlobId(id, indexId)).collect(Collectors.toSet()); + final ActionListener allShardCountsListener = + new GroupedActionListener<>(shardCountListener, indexMetaGenerations.size()); + final BlobContainer indexContainer = indexContainer(indexId); + for (String indexMetaGeneration : indexMetaGenerations) { executor.execute(ActionRunnable.supply(allShardCountsListener, () -> { try { - return getSnapshotIndexMetadata(snapshotId, indexId).getNumberOfShards(); + return indexMetadataFormat.read(indexContainer, indexMetaGeneration).getNumberOfShards(); } catch (Exception ex) { logger.warn(() -> new ParameterizedMessage( - "[{}] [{}] failed to read metadata for index", snapshotId, indexId.getName()), ex); + "[{}] [{}] failed to read metadata for index", indexMetaGeneration, indexId.getName()), ex); // Just invoke the listener without any shard generations to count it down, this index will be cleaned up // by the stale data cleanup in the end. // TODO: Getting here means repository corruption. We should find a way of dealing with this instead of just @@ -797,20 +804,22 @@ public void onFailure(Exception ex) { } } - private List resolveFilesToDelete(Collection snapshotIds, + private List resolveFilesToDelete(RepositoryData oldRepositoryData, Collection snapshotIds, Collection deleteResults) { final String basePath = basePath().buildAsString(); final int basePathLen = basePath.length(); + final Map> indexMetaGenerations = + oldRepositoryData.indexMetaDataToRemoveAfterRemovingSnapshots(snapshotIds); return Stream.concat( - deleteResults.stream().flatMap(shardResult -> { - final String shardPath = - shardContainer(shardResult.indexId, shardResult.shardId).path().buildAsString(); - return shardResult.blobsToDelete.stream().map(blob -> shardPath + blob); - }), - deleteResults.stream().map(shardResult -> shardResult.indexId).distinct().flatMap(indexId -> { - final String indexContainerPath = indexContainer(indexId).path().buildAsString(); - return snapshotIds.stream().map(snapshotId -> indexContainerPath + globalMetadataFormat.blobName(snapshotId.getUUID())); - }) + deleteResults.stream().flatMap(shardResult -> { + final String shardPath = + shardContainer(shardResult.indexId, shardResult.shardId).path().buildAsString(); + return shardResult.blobsToDelete.stream().map(blob -> shardPath + blob); + }), + indexMetaGenerations.entrySet().stream().flatMap(entry -> { + final String indexContainerPath = indexContainer(entry.getKey()).path().buildAsString(); + return entry.getValue().stream().map(id -> indexContainerPath + indexMetadataFormat.blobName(id)); + }) ).map(absolutePath -> { assert absolutePath.startsWith(basePath); return absolutePath.substring(basePathLen); @@ -854,6 +863,7 @@ private void cleanupStaleBlobs(Collection deletedSnapshots, Map *
  • Deleting stale indices {@link #cleanupStaleIndices}
  • *
  • Deleting unreferenced root level blobs {@link #cleanupStaleRootFiles}
  • @@ -878,7 +888,7 @@ public void cleanup(long repositoryStateId, Version repositoryMetaVersion, Actio listener.onResponse(new RepositoryCleanupResult(DeleteResult.ZERO)); } else { // write new index-N blob to ensure concurrent operations will fail - writeIndexGen(repositoryData, repositoryStateId, SnapshotsService.useShardGenerations(repositoryMetaVersion), + writeIndexGen(repositoryData, repositoryStateId, repositoryMetaVersion, Function.identity(), ActionListener.wrap(v -> cleanupStaleBlobs(Collections.emptyList(), foundIndices, rootBlobs, repositoryData, ActionListener.map(listener, RepositoryCleanupResult::new)), listener::onFailure)); } @@ -1002,49 +1012,82 @@ public void finalizeSnapshot(final SnapshotId snapshotId, final boolean writeShardGens = SnapshotsService.useShardGenerations(repositoryMetaVersion); final Consumer onUpdateFailure = e -> listener.onFailure(new SnapshotException(metadata.name(), snapshotId, "failed to update snapshot in repository", e)); - final ActionListener allMetaListener = new GroupedActionListener<>( - ActionListener.wrap(snapshotInfos -> { - assert snapshotInfos.size() == 1 : "Should have only received a single SnapshotInfo but received " + snapshotInfos; - final SnapshotInfo snapshotInfo = snapshotInfos.iterator().next(); - getRepositoryData(ActionListener.wrap(existingRepositoryData -> { - final RepositoryData updatedRepositoryData = - existingRepositoryData.addSnapshot(snapshotId, snapshotInfo.state(), Version.CURRENT, shardGenerations); - writeIndexGen(updatedRepositoryData, repositoryStateId, writeShardGens, stateTransformer, - ActionListener.wrap(writtenRepoData -> { - if (writeShardGens) { - cleanupOldShardGens(existingRepositoryData, updatedRepositoryData); - } - listener.onResponse(new Tuple<>(writtenRepoData, snapshotInfo)); - }, onUpdateFailure)); - }, onUpdateFailure)); - }, onUpdateFailure), 2 + indices.size()); + final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); - // We ignore all FileAlreadyExistsException when writing metadata since otherwise a master failover while in this method will - // mean that no snap-${uuid}.dat blob is ever written for this snapshot. This is safe because any updated version of the - // index or global metadata will be compatible with the segments written in this snapshot as well. - // Failing on an already existing index-${repoGeneration} below ensures that the index.latest blob is not updated in a way - // that decrements the generation it points at + final boolean writeIndexGens = SnapshotsService.useIndexGenerations(repositoryMetaVersion); - // Write Global Metadata - executor.execute(ActionRunnable.run(allMetaListener, - () -> globalMetadataFormat.write(clusterMetadata, blobContainer(), snapshotId.getUUID(), false))); + final StepListener repoDataListener = new StepListener<>(); + executor.execute(ActionRunnable.wrap(repoDataListener, this::getRepositoryData)); + repoDataListener.whenComplete(existingRepositoryData -> { - // write the index metadata for each index in the snapshot - for (IndexId index : indices) { - executor.execute(ActionRunnable.run(allMetaListener, () -> - indexMetadataFormat.write(clusterMetadata.index(index.getName()), indexContainer(index), snapshotId.getUUID(), false))); - } + final Map indexMetas; + final Map indexMetaIdentifiers; + if (writeIndexGens) { + indexMetaIdentifiers = ConcurrentCollections.newConcurrentMap(); + indexMetas = ConcurrentCollections.newConcurrentMap(); + } else { + indexMetas = null; + indexMetaIdentifiers = null; + } + + final ActionListener allMetaListener = new GroupedActionListener<>( + ActionListener.wrap(snapshotInfos -> { + assert snapshotInfos.size() == 1 : "Should have only received a single SnapshotInfo but received " + snapshotInfos; + final SnapshotInfo snapshotInfo = snapshotInfos.iterator().next(); + final RepositoryData updatedRepositoryData = existingRepositoryData.addSnapshot( + snapshotId, snapshotInfo.state(), Version.CURRENT, shardGenerations, indexMetas, indexMetaIdentifiers); + writeIndexGen(updatedRepositoryData, repositoryStateId, repositoryMetaVersion, stateTransformer, + ActionListener.wrap( + newRepoData -> { + if (writeShardGens) { + cleanupOldShardGens(existingRepositoryData, updatedRepositoryData); + } + listener.onResponse(new Tuple<>(newRepoData, snapshotInfo)); + }, onUpdateFailure)); + }, onUpdateFailure), 2 + indices.size()); + + // We ignore all FileAlreadyExistsException when writing metadata since otherwise a master failover while in this method will + // mean that no snap-${uuid}.dat blob is ever written for this snapshot. This is safe because any updated version of the + // index or global metadata will be compatible with the segments written in this snapshot as well. + // Failing on an already existing index-${repoGeneration} below ensures that the index.latest blob is not updated in a way + // that decrements the generation it points at + + // Write Global MetaData + executor.execute(ActionRunnable.run(allMetaListener, + () -> globalMetadataFormat.write(clusterMetadata, blobContainer(), snapshotId.getUUID(), false))); - executor.execute(ActionRunnable.supply(allMetaListener, () -> { - final SnapshotInfo snapshotInfo = new SnapshotInfo(snapshotId, - indices.stream().map(IndexId::getName).collect(Collectors.toList()), + // write the index metadata for each index in the snapshot + for (IndexId index : indices) { + executor.execute(ActionRunnable.run(allMetaListener, () -> { + final IndexMetadata indexMetaData = clusterMetadata.index(index.getName()); + if (writeIndexGens) { + final String identifiers = IndexMetaDataGenerations.buildUniqueIdentifier(indexMetaData); + String metaUUID = existingRepositoryData.indexMetaDataGenerations().getIndexMetaBlobId(identifiers); + if (metaUUID == null) { + // We don't yet have this version of the metadata so we write it + metaUUID = UUIDs.base64UUID(); + indexMetadataFormat.write(indexMetaData, indexContainer(index), metaUUID, false); + indexMetaIdentifiers.put(identifiers, metaUUID); + } + indexMetas.put(index, identifiers); + } else { + indexMetadataFormat.write( + clusterMetadata.index(index.getName()), indexContainer(index), snapshotId.getUUID(), false); + } + } + )); + } + executor.execute(ActionRunnable.supply(allMetaListener, () -> { + final SnapshotInfo snapshotInfo = new SnapshotInfo(snapshotId, + indices.stream().map(IndexId::getName).collect(Collectors.toList()), new ArrayList<>(clusterMetadata.dataStreams().keySet()), - startTime, failure, threadPool.absoluteTimeInMillis(), totalShards, shardFailures, - includeGlobalState, userMetadata); - snapshotFormat.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), false); - return snapshotInfo; - })); + startTime, failure, threadPool.absoluteTimeInMillis(), totalShards, shardFailures, + includeGlobalState, userMetadata); + snapshotFormat.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), false); + return snapshotInfo; + })); + }, onUpdateFailure); } // Delete all old shard gen blobs that aren't referenced any longer as a result from moving to updated repository data @@ -1084,9 +1127,10 @@ public Metadata getSnapshotGlobalMetadata(final SnapshotId snapshotId) { } @Override - public IndexMetadata getSnapshotIndexMetadata(final SnapshotId snapshotId, final IndexId index) throws IOException { + public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, SnapshotId snapshotId, IndexId index) throws IOException { try { - return indexMetadataFormat.read(indexContainer(index), snapshotId.getUUID()); + return indexMetadataFormat.read(indexContainer(index), + repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotId, index)); } catch (NoSuchFileException e) { throw new SnapshotMissingException(metadata.name(), snapshotId, e); } @@ -1245,7 +1289,7 @@ private void doGetRepositoryData(ActionListener listener) { loaded = getRepositoryData(genToLoad); // We can cache in the most recent version here without regard to the actual repository metadata version since we're // only caching the information that we just wrote and thus won't accidentally cache any information that isn't safe - cacheRepositoryData(loaded, true); + cacheRepositoryData(loaded, Version.CURRENT); } listener.onResponse(loaded); return; @@ -1280,17 +1324,17 @@ private void doGetRepositoryData(ActionListener listener) { * modification can lead to moving from a higher {@code N} to a lower {@code N} value which mean we can't safely assume that a given * generation will always contain the same {@link RepositoryData}. * - * @param updated RepositoryData to cache if newer than the cache contents - * @param writeShardGens whether to cache shard generation values + * @param updated RepositoryData to cache if newer than the cache contents + * @param version version of the repository metadata that was cached */ - private void cacheRepositoryData(RepositoryData updated, boolean writeShardGens) { + private void cacheRepositoryData(RepositoryData updated, Version version) { if (cacheRepositoryData && bestEffortConsistency == false) { final BytesReference serialized; BytesStreamOutput out = new BytesStreamOutput(); try { try (StreamOutput tmp = CompressorFactory.COMPRESSOR.streamOutput(out); XContentBuilder builder = XContentFactory.jsonBuilder(tmp)) { - updated.snapshotsToXContent(builder, writeShardGens); + updated.snapshotsToXContent(builder, version); } serialized = out.bytes(); final int len = serialized.length(); @@ -1423,11 +1467,11 @@ public boolean isReadOnly() { * * @param repositoryData RepositoryData to write * @param expectedGen expected repository generation at the start of the operation - * @param writeShardGens whether to write {@link ShardGenerations} to the new {@link RepositoryData} blob + * @param version version of the repository metadata to write * @param stateFilter filter for the last cluster state update executed by this method * @param listener completion listener */ - protected void writeIndexGen(RepositoryData repositoryData, long expectedGen, boolean writeShardGens, + protected void writeIndexGen(RepositoryData repositoryData, long expectedGen, Version version, Function stateFilter, ActionListener listener) { assert isReadOnly() == false; // can not write to a read only repository final long currentGen = repositoryData.getGenId(); @@ -1538,7 +1582,8 @@ public void onFailure(Exception e) { final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen); logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob); writeAtomic(indexBlob, - BytesReference.bytes(filteredRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), writeShardGens)), true); + BytesReference.bytes( + filteredRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), version)), true); // write the current generation to the index-latest file final BytesReference genBytes; try (BytesStreamOutput bStream = new BytesStreamOutput()) { @@ -1579,7 +1624,7 @@ public void onFailure(String source, Exception e) { @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { final RepositoryData writtenRepositoryData = filteredRepositoryData.withGenId(newGen); - cacheRepositoryData(writtenRepositoryData, writeShardGens); + cacheRepositoryData(writtenRepositoryData, version); threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(listener, () -> { // Delete all now outdated index files up to 1000 blobs back from the new generation. // If there are more than 1000 dangling index-N cleanup functionality on repo delete will take care of them. diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java index 9a55a515ad48..8b8e7c0e5d12 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -246,7 +246,7 @@ public void restoreSnapshot(final RestoreSnapshotRequest request, final ActionLi final List indexIdsInSnapshot = repositoryData.resolveIndices(indicesInSnapshot); for (IndexId indexId : indexIdsInSnapshot) { - metadataBuilder.put(repository.getSnapshotIndexMetadata(snapshotId, indexId), false); + metadataBuilder.put(repository.getSnapshotIndexMetaData(repositoryData, snapshotId, indexId), false); } final Metadata metadata = metadataBuilder.build(); diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 7959901b0894..00029f8ef33a 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -122,6 +122,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus public static final Version SHARD_GEN_IN_REPO_DATA_VERSION = Version.V_7_6_0; + public static final Version INDEX_GEN_IN_REPO_DATA_VERSION = Version.V_7_9_0; + public static final Version OLD_SNAPSHOT_FORMAT = Version.V_7_5_0; public static final Version MULTI_DELETE_VERSION = Version.V_7_8_0; @@ -1505,7 +1507,16 @@ public static boolean useShardGenerations(Version repositoryMetaVersion) { } /** - * Deletes snapshot from repository + * Checks whether the metadata version supports writing {@link ShardGenerations} to the repository. + * + * @param repositoryMetaVersion version to check + * @return true if version supports {@link ShardGenerations} + */ + public static boolean useIndexGenerations(Version repositoryMetaVersion) { + return repositoryMetaVersion.onOrAfter(INDEX_GEN_IN_REPO_DATA_VERSION); + } + + /** Deletes snapshot from repository * * @param repoName repository name * @param snapshotIds snapshot ids diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java index e6bab4428d74..38afb94f8152 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java @@ -49,7 +49,6 @@ import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; -import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -151,7 +150,7 @@ public Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId) { } @Override - public IndexMetadata getSnapshotIndexMetadata(SnapshotId snapshotId, IndexId index) throws IOException { + public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, SnapshotId snapshotId, IndexId index) { return null; } diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoryDataTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoryDataTests.java index 33c67f77ebcb..584956bbffcd 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoryDataTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoryDataTests.java @@ -34,6 +34,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -41,6 +42,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; import static org.elasticsearch.repositories.RepositoryData.EMPTY_REPO_GEN; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -74,7 +77,7 @@ public void testIndicesToUpdateAfterRemovingSnapshot() { public void testXContent() throws IOException { RepositoryData repositoryData = generateRandomRepoData(); XContentBuilder builder = JsonXContent.contentBuilder(); - repositoryData.snapshotsToXContent(builder, true); + repositoryData.snapshotsToXContent(builder, Version.CURRENT); try (XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder))) { long gen = (long) randomIntBetween(0, 500); RepositoryData fromXContent = RepositoryData.snapshotsFromXContent(parser, gen, randomBoolean()); @@ -106,9 +109,14 @@ public void testAddSnapshots() { indices.add(indexId); builder.put(indexId, 0, UUIDs.randomBase64UUID(random())); } + final ShardGenerations shardGenerations = builder.build(); + final Map indexLookup = + shardGenerations.indices().stream().collect(Collectors.toMap(Function.identity(), ind -> randomAlphaOfLength(256))); RepositoryData newRepoData = repositoryData.addSnapshot(newSnapshot, randomFrom(SnapshotState.SUCCESS, SnapshotState.PARTIAL, SnapshotState.FAILED), - randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion()), builder.build()); + randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion()), + shardGenerations, indexLookup, + indexLookup.values().stream().collect(Collectors.toMap(Function.identity(), ignored -> UUIDs.randomBase64UUID(random())))); // verify that the new repository data has the new snapshot and its indices assertTrue(newRepoData.getSnapshotIds().contains(newSnapshot)); for (IndexId indexId : indices) { @@ -132,12 +140,12 @@ public void testInitIndices() { snapshotStates.put(snapshotId.getUUID(), randomFrom(SnapshotState.values())); snapshotVersions.put(snapshotId.getUUID(), randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion())); } - RepositoryData repositoryData = new RepositoryData(EMPTY_REPO_GEN, snapshotIds, - Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), ShardGenerations.EMPTY); + RepositoryData repositoryData = new RepositoryData(EMPTY_REPO_GEN, snapshotIds, Collections.emptyMap(), Collections.emptyMap(), + Collections.emptyMap(), ShardGenerations.EMPTY, IndexMetaDataGenerations.EMPTY); // test that initializing indices works Map> indices = randomIndices(snapshotIds); - RepositoryData newRepoData = - new RepositoryData(repositoryData.getGenId(), snapshotIds, snapshotStates, snapshotVersions, indices, ShardGenerations.EMPTY); + RepositoryData newRepoData = new RepositoryData(repositoryData.getGenId(), snapshotIds, snapshotStates, snapshotVersions, indices, + ShardGenerations.EMPTY, IndexMetaDataGenerations.EMPTY); List expected = new ArrayList<>(repositoryData.getSnapshotIds()); Collections.sort(expected); List actual = new ArrayList<>(newRepoData.getSnapshotIds()); @@ -153,7 +161,8 @@ public void testRemoveSnapshot() { List snapshotIds = new ArrayList<>(repositoryData.getSnapshotIds()); assertThat(snapshotIds.size(), greaterThan(0)); SnapshotId removedSnapshotId = snapshotIds.remove(randomIntBetween(0, snapshotIds.size() - 1)); - RepositoryData newRepositoryData = repositoryData.removeSnapshots(Collections.singleton(removedSnapshotId), ShardGenerations.EMPTY); + RepositoryData newRepositoryData = + repositoryData.removeSnapshots(Collections.singleton(removedSnapshotId), ShardGenerations.EMPTY); // make sure the repository data's indices no longer contain the removed snapshot for (final IndexId indexId : newRepositoryData.getIndices().values()) { assertFalse(newRepositoryData.getSnapshots(indexId).contains(removedSnapshotId)); @@ -173,8 +182,9 @@ public void testResolveIndexId() { public void testGetSnapshotState() { final SnapshotId snapshotId = new SnapshotId(randomAlphaOfLength(8), UUIDs.randomBase64UUID()); final SnapshotState state = randomFrom(SnapshotState.values()); - final RepositoryData repositoryData = RepositoryData.EMPTY.addSnapshot(snapshotId, state, - randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion()), ShardGenerations.EMPTY); + final RepositoryData repositoryData = + RepositoryData.EMPTY.addSnapshot(snapshotId, state, randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion()), + ShardGenerations.EMPTY, Collections.emptyMap(), Collections.emptyMap()); assertEquals(state, repositoryData.getSnapshotState(snapshotId)); assertNull(repositoryData.getSnapshotState(new SnapshotId(randomAlphaOfLength(8), UUIDs.randomBase64UUID()))); } @@ -184,7 +194,7 @@ public void testIndexThatReferencesAnUnknownSnapshot() throws IOException { final RepositoryData repositoryData = generateRandomRepoData(); XContentBuilder builder = XContentBuilder.builder(xContent); - repositoryData.snapshotsToXContent(builder, true); + repositoryData.snapshotsToXContent(builder, Version.CURRENT); RepositoryData parsedRepositoryData; try (XContentParser xParser = createParser(builder)) { parsedRepositoryData = RepositoryData.snapshotsFromXContent(xParser, repositoryData.getGenId(), randomBoolean()); @@ -219,10 +229,10 @@ public void testIndexThatReferencesAnUnknownSnapshot() throws IOException { assertNotNull(corruptedIndexId); RepositoryData corruptedRepositoryData = new RepositoryData(parsedRepositoryData.getGenId(), snapshotIds, snapshotStates, - snapshotVersions, indexSnapshots, shardGenBuilder.build()); + snapshotVersions, indexSnapshots, shardGenBuilder.build(), IndexMetaDataGenerations.EMPTY); final XContentBuilder corruptedBuilder = XContentBuilder.builder(xContent); - corruptedRepositoryData.snapshotsToXContent(corruptedBuilder, true); + corruptedRepositoryData.snapshotsToXContent(corruptedBuilder, Version.CURRENT); try (XContentParser xParser = createParser(corruptedBuilder)) { ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> @@ -269,6 +279,57 @@ public void testIndexThatReferenceANullSnapshot() throws IOException { } } + // Test removing snapshot from random data where no two snapshots share any index metadata blobs + public void testIndexMetaDataToRemoveAfterRemovingSnapshotNoSharing() { + final RepositoryData repositoryData = generateRandomRepoData(); + final SnapshotId snapshotId = randomFrom(repositoryData.getSnapshotIds()); + final IndexMetaDataGenerations indexMetaDataGenerations = repositoryData.indexMetaDataGenerations(); + final Collection indicesToUpdate = repositoryData.indicesToUpdateAfterRemovingSnapshot(Collections.singleton(snapshotId)); + final Map> identifiersToRemove = indexMetaDataGenerations.lookup.get(snapshotId).entrySet().stream() + .filter(e -> indicesToUpdate.contains(e.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, + e -> Collections.singleton(indexMetaDataGenerations.getIndexMetaBlobId(e.getValue())))); + assertEquals(repositoryData.indexMetaDataToRemoveAfterRemovingSnapshots(Collections.singleton(snapshotId)), identifiersToRemove); + } + + // Test removing snapshot from random data that has some or all index metadata shared + public void testIndexMetaDataToRemoveAfterRemovingSnapshotWithSharing() { + final RepositoryData repositoryData = generateRandomRepoData(); + final ShardGenerations.Builder builder = ShardGenerations.builder(); + final SnapshotId otherSnapshotId = randomFrom(repositoryData.getSnapshotIds()); + final Collection indicesInOther = repositoryData.getIndices().values() + .stream() + .filter(index -> repositoryData.getSnapshots(index).contains(otherSnapshotId)) + .collect(Collectors.toSet()); + for (IndexId indexId : indicesInOther) { + builder.put(indexId, 0, UUIDs.randomBase64UUID(random())); + } + final Map newIndices = new HashMap<>(); + final Map newIdentifiers = new HashMap<>(); + final Map> removeFromOther = new HashMap<>(); + for (IndexId indexId : randomSubsetOf(repositoryData.getIndices().values())) { + if (indicesInOther.contains(indexId)) { + removeFromOther.put(indexId, Collections.singleton( + repositoryData.indexMetaDataGenerations().indexMetaBlobId(otherSnapshotId, indexId))); + } + final String identifier = randomAlphaOfLength(20); + newIndices.put(indexId, identifier); + newIdentifiers.put(identifier, UUIDs.randomBase64UUID(random())); + builder.put(indexId, 0, UUIDs.randomBase64UUID(random())); + } + final ShardGenerations shardGenerations = builder.build(); + final Map indexLookup = new HashMap<>(repositoryData.indexMetaDataGenerations().lookup.get(otherSnapshotId)); + indexLookup.putAll(newIndices); + final SnapshotId newSnapshot = new SnapshotId(randomAlphaOfLength(7), UUIDs.randomBase64UUID(random())); + + RepositoryData newRepoData = + repositoryData.addSnapshot(newSnapshot, SnapshotState.SUCCESS, Version.CURRENT, shardGenerations, indexLookup, newIdentifiers); + assertEquals(newRepoData.indexMetaDataToRemoveAfterRemovingSnapshots(Collections.singleton(newSnapshot)), + newIndices.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, + e -> Collections.singleton(newIdentifiers.get(e.getValue()))))); + assertEquals(newRepoData.indexMetaDataToRemoveAfterRemovingSnapshots(Collections.singleton(otherSnapshotId)), removeFromOther); + } + public static RepositoryData generateRandomRepoData() { final int numIndices = randomIntBetween(1, 30); final List indices = new ArrayList<>(numIndices); @@ -288,8 +349,14 @@ public static RepositoryData generateRandomRepoData() { builder.put(someIndex, j, uuid); } } - repositoryData = repositoryData.addSnapshot(snapshotId, randomFrom(SnapshotState.values()), - randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion()), builder.build()); + final Map indexLookup = + someIndices.stream().collect(Collectors.toMap(Function.identity(), ind -> randomAlphaOfLength(256))); + repositoryData = repositoryData.addSnapshot( + snapshotId, randomFrom(SnapshotState.values()), + randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion()), + builder.build(), + indexLookup, + indexLookup.values().stream().collect(Collectors.toMap(Function.identity(), ignored -> UUIDs.randomBase64UUID(random())))); } return repositoryData; } diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java index ba142f6cafef..c086bbb42e00 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java @@ -200,7 +200,7 @@ public void testRepositoryDataConcurrentModificationNotAllowed() { RepositoryData repositoryData = generateRandomRepoData(); final long startingGeneration = repositoryData.getGenId(); final PlainActionFuture future1 = PlainActionFuture.newFuture(); - repository.writeIndexGen(repositoryData, startingGeneration, true, Function.identity(),future1); + repository.writeIndexGen(repositoryData, startingGeneration, Version.CURRENT, Function.identity(),future1); // write repo data again to index generational file, errors because we already wrote to the // N+1 generation from which this repository data instance was created @@ -241,8 +241,8 @@ public void testFsRepositoryCompressDeprecated() { } private static void writeIndexGen(BlobStoreRepository repository, RepositoryData repositoryData, long generation) throws Exception { - PlainActionFuture.get(f -> repository.writeIndexGen(repositoryData, generation, true, - Function.identity(), f)); + PlainActionFuture.get( + f -> repository.writeIndexGen(repositoryData, generation, Version.CURRENT, Function.identity(), f)); } private BlobStoreRepository setupRepo() { @@ -273,8 +273,13 @@ private RepositoryData addRandomSnapshotsToRepoData(RepositoryData repoData, boo for (int j = 0; j < numIndices; j++) { builder.put(new IndexId(randomAlphaOfLength(8), UUIDs.randomBase64UUID()), 0, "1"); } + final ShardGenerations shardGenerations = builder.build(); + final Map indexLookup = + shardGenerations.indices().stream().collect(Collectors.toMap(Function.identity(), ind -> randomAlphaOfLength(256))); repoData = repoData.addSnapshot(snapshotId, - randomFrom(SnapshotState.SUCCESS, SnapshotState.PARTIAL, SnapshotState.FAILED), Version.CURRENT, builder.build()); + randomFrom(SnapshotState.SUCCESS, SnapshotState.PARTIAL, SnapshotState.FAILED), Version.CURRENT, shardGenerations, + indexLookup, + indexLookup.values().stream().collect(Collectors.toMap(Function.identity(), ignored -> UUIDs.randomBase64UUID(random())))); } return repoData; } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index e002fc197b2a..3e81f15bfd8d 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -778,7 +778,7 @@ public void onFailure(Exception e) { assertThat(snapshotInfo.successfulShards(), either(is(indices + 1)).or(is(indices))); if (snapshotInfo.successfulShards() == indices + 1) { final IndexMetadata indexMetadata = - repository.getSnapshotIndexMetadata(snapshotInfo.snapshotId(), repositoryData.resolveIndexId(index)); + repository.getSnapshotIndexMetaData(repositoryData, snapshotInfo.snapshotId(), repositoryData.resolveIndexId(index)); // Make sure we snapshotted the metadata of this index and not the recreated version assertEquals(indexMetadata.getIndex(), firstIndex.get()); } diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java index 9c9b144e94cf..a3d046efb400 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java @@ -33,6 +33,7 @@ import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.index.store.Store; import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.IndexMetaDataGenerations; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.ShardGenerations; @@ -40,7 +41,6 @@ import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotShardFailure; -import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -87,7 +87,7 @@ public Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId) { } @Override - public IndexMetadata getSnapshotIndexMetadata(SnapshotId snapshotId, IndexId index) throws IOException { + public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, SnapshotId snapshotId, IndexId index) { return null; } @@ -95,7 +95,7 @@ public IndexMetadata getSnapshotIndexMetadata(SnapshotId snapshotId, IndexId ind public void getRepositoryData(ActionListener listener) { final IndexId indexId = new IndexId(indexName, "blah"); listener.onResponse(new RepositoryData(EMPTY_REPO_GEN, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), - Collections.singletonMap(indexId, emptyList()), ShardGenerations.EMPTY)); + Collections.singletonMap(indexId, emptyList()), ShardGenerations.EMPTY, IndexMetaDataGenerations.EMPTY)); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java index c3812602baf3..a8b664b345b5 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java @@ -61,6 +61,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; @@ -122,7 +123,7 @@ public static void assertConsistency(BlobStoreRepository repository, Executor ex LoggingDeprecationHandler.INSTANCE, blob)) { repositoryData = RepositoryData.snapshotsFromXContent(parser, latestGen, false); } - assertIndexUUIDs(blobContainer, repositoryData); + assertIndexUUIDs(repository, repositoryData); assertSnapshotUUIDs(repository, repositoryData); assertShardIndexGenerations(blobContainer, repositoryData.shardGenerations()); return null; @@ -165,10 +166,10 @@ private static void assertShardIndexGenerations(BlobContainer repoRoot, ShardGen } } - private static void assertIndexUUIDs(BlobContainer repoRoot, RepositoryData repositoryData) throws IOException { + private static void assertIndexUUIDs(BlobStoreRepository repository, RepositoryData repositoryData) throws IOException { final List expectedIndexUUIDs = repositoryData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toList()); - final BlobContainer indicesContainer = repoRoot.children().get("indices"); + final BlobContainer indicesContainer = repository.blobContainer().children().get("indices"); final List foundIndexUUIDs; if (indicesContainer == null) { foundIndexUUIDs = Collections.emptyList(); @@ -178,6 +179,21 @@ private static void assertIndexUUIDs(BlobContainer repoRoot, RepositoryData repo s -> s.startsWith("extra") == false).collect(Collectors.toList()); } assertThat(foundIndexUUIDs, containsInAnyOrder(expectedIndexUUIDs.toArray(Strings.EMPTY_ARRAY))); + for (String indexId : foundIndexUUIDs) { + final Set indexMetaGenerationsFound = indicesContainer.children().get(indexId) + .listBlobsByPrefix(BlobStoreRepository.METADATA_PREFIX).keySet().stream() + .map(p -> p.replace(BlobStoreRepository.METADATA_PREFIX, "").replace(".dat", "")) + .collect(Collectors.toSet()); + final Set indexMetaGenerationsExpected = new HashSet<>(); + final IndexId idx = + repositoryData.getIndices().values().stream().filter(i -> i.getId().equals(indexId)).findFirst().get(); + for (SnapshotId snapshotId : repositoryData.getSnapshots(idx)) { + indexMetaGenerationsExpected.add(repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotId, idx)); + } + // TODO: assertEquals(indexMetaGenerationsExpected, indexMetaGenerationsFound); requires cleanup functionality for + // index meta generations blobs + assertTrue(indexMetaGenerationsFound.containsAll(indexMetaGenerationsExpected)); + } } private static void assertSnapshotUUIDs(BlobStoreRepository repository, RepositoryData repositoryData) throws IOException { @@ -208,8 +224,9 @@ private static void assertSnapshotUUIDs(BlobStoreRepository repository, Reposito assertThat(indices, hasKey(indexId.getId())); final BlobContainer indexContainer = indices.get(indexId.getId()); assertThat(indexContainer.listBlobs(), - hasKey(String.format(Locale.ROOT, BlobStoreRepository.METADATA_NAME_FORMAT, snapshotId.getUUID()))); - final IndexMetadata indexMetadata = repository.getSnapshotIndexMetadata(snapshotId, indexId); + hasKey(String.format(Locale.ROOT, BlobStoreRepository.METADATA_NAME_FORMAT, + repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotId, indexId)))); + final IndexMetadata indexMetadata = repository.getSnapshotIndexMetaData(repositoryData, snapshotId, indexId); for (Map.Entry entry : indexContainer.children().entrySet()) { // Skip Lucene MockFS extraN directory if (entry.getKey().startsWith("extra")) { diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java index d36c69d0c6b3..3268318f5f72 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -53,7 +53,6 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.test.VersionUtils; -import org.elasticsearch.threadpool.ThreadPool; import org.junit.After; import java.io.IOException; @@ -325,8 +324,7 @@ protected String initWithSnapshotVersion(String repoName, Path repoPath, Version logger.info("--> writing downgraded RepositoryData for repository metadata version [{}]", version); final RepositoryData repositoryData = getRepositoryData(repoName); final XContentBuilder jsonBuilder = JsonXContent.contentBuilder(); - final boolean writeShardGens = version.onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION); - repositoryData.snapshotsToXContent(jsonBuilder, writeShardGens); + repositoryData.snapshotsToXContent(jsonBuilder, version); final RepositoryData downgradedRepoData = RepositoryData.snapshotsFromXContent(JsonXContent.jsonXContent.createParser( NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, @@ -334,7 +332,7 @@ protected String initWithSnapshotVersion(String repoName, Path repoPath, Version repositoryData.getGenId(), randomBoolean()); Files.write(repoPath.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + repositoryData.getGenId()), BytesReference.toBytes(BytesReference.bytes( - downgradedRepoData.snapshotsToXContent(XContentFactory.jsonBuilder(), writeShardGens))), + downgradedRepoData.snapshotsToXContent(XContentFactory.jsonBuilder(), version))), StandardOpenOption.TRUNCATE_EXISTING); return oldVersionSnapshot; } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 755683bf651d..f21a432ce070 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -61,6 +61,7 @@ import org.elasticsearch.indices.recovery.MultiFileWriter; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.IndexMetaDataGenerations; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.ShardGenerations; @@ -193,7 +194,7 @@ public Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId) { } @Override - public IndexMetadata getSnapshotIndexMetadata(SnapshotId snapshotId, IndexId index) throws IOException { + public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, SnapshotId snapshotId, IndexId index) { assert SNAPSHOT_ID.equals(snapshotId) : "RemoteClusterRepository only supports " + SNAPSHOT_ID + " as the SnapshotId"; String leaderIndex = index.getName(); Client remoteClient = getRemoteClusterClient(); @@ -256,7 +257,8 @@ public void getRepositoryData(ActionListener listener) { Index index = remoteIndices.get(indexName).getIndex(); indexSnapshots.put(new IndexId(indexName, index.getUUID()), Collections.singletonList(snapshotId)); } - return new RepositoryData(1, copiedSnapshotIds, snapshotStates, snapshotVersions, indexSnapshots, ShardGenerations.EMPTY); + return new RepositoryData(1, copiedSnapshotIds, snapshotStates, snapshotVersions, indexSnapshots, + ShardGenerations.EMPTY, IndexMetaDataGenerations.EMPTY); }); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java index 91140d57f2ea..43f7f97cd571 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java @@ -237,7 +237,8 @@ public void testRestoreMinmal() throws IOException { ShardRoutingState.INITIALIZING, new RecoverySource.SnapshotRecoverySource( UUIDs.randomBase64UUID(), new Snapshot("src_only", snapshotId), Version.CURRENT, indexId)); - IndexMetadata metadata = runAsSnapshot(threadPool, () -> repository.getSnapshotIndexMetadata(snapshotId, indexId)); + IndexMetadata metadata = runAsSnapshot(threadPool, () -> + repository.getSnapshotIndexMetaData(PlainActionFuture.get(repository::getRepositoryData), snapshotId, indexId)); IndexShard restoredShard = newShard( shardRouting, metadata, null, SourceOnlySnapshotRepository.getEngineFactory(), () -> {}, RetentionLeaseSyncer.EMPTY); DiscoveryNode discoveryNode = new DiscoveryNode("node_g", buildNewFakeTransportAddress(), Version.CURRENT);