diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 63f0b467748a1..1f14f074886f1 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -433,7 +433,7 @@ private RepositoryData safeRepositoryData(long repositoryStateId, Map foundIndices, Map rootBlobs, RepositoryData repositoryData, boolean writeShardGens, - ActionListener listener) throws IOException { + ActionListener listener) { if (writeShardGens) { // First write the new shard state metadata (with the removed snapshot) and compute deletion targets @@ -448,14 +448,14 @@ private void doDeleteShardSnapshots(SnapshotId snapshotId, long repositoryStateI // written if all shard paths have been successfully updated. final StepListener writeUpdatedRepoDataStep = new StepListener<>(); writeShardMetaDataAndComputeDeletesStep.whenComplete(deleteResults -> { - final ShardGenerations.Builder builder = ShardGenerations.builder(); - for (ShardSnapshotMetaDeleteResult newGen : deleteResults) { - builder.put(newGen.indexId, newGen.shardId, newGen.newGeneration); - } - final RepositoryData updatedRepoData = repositoryData.removeSnapshot(snapshotId, builder.build()); - writeIndexGen(updatedRepoData, repositoryStateId, true); - writeUpdatedRepoDataStep.onResponse(updatedRepoData); - }, listener::onFailure); + final ShardGenerations.Builder builder = ShardGenerations.builder(); + for (ShardSnapshotMetaDeleteResult newGen : deleteResults) { + builder.put(newGen.indexId, newGen.shardId, newGen.newGeneration); + } + final RepositoryData updatedRepoData = repositoryData.removeSnapshot(snapshotId, builder.build()); + writeIndexGen(updatedRepoData, repositoryStateId, true, + ActionListener.wrap(v -> writeUpdatedRepoDataStep.onResponse(updatedRepoData), listener::onFailure)); + }, listener::onFailure); // Once we have updated the repository, run the clean-ups writeUpdatedRepoDataStep.whenComplete(updatedRepoData -> { // Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion @@ -467,15 +467,17 @@ private void doDeleteShardSnapshots(SnapshotId snapshotId, long repositoryStateI } else { // Write the new repository data first (with the removed snapshot), using no shard generations final RepositoryData updatedRepoData = repositoryData.removeSnapshot(snapshotId, ShardGenerations.EMPTY); - writeIndexGen(updatedRepoData, repositoryStateId, false); - // Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion - final ActionListener afterCleanupsListener = - new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2); - asyncCleanupUnlinkedRootAndIndicesBlobs(foundIndices, rootBlobs, updatedRepoData, afterCleanupsListener); - final StepListener> writeMetaAndComputeDeletesStep = new StepListener<>(); - writeUpdatedShardMetaDataAndComputeDeletes(snapshotId, repositoryData, false, writeMetaAndComputeDeletesStep); - writeMetaAndComputeDeletesStep.whenComplete(deleteResults -> - asyncCleanupUnlinkedShardLevelBlobs(snapshotId, deleteResults, afterCleanupsListener), afterCleanupsListener::onFailure); + writeIndexGen(updatedRepoData, repositoryStateId, false, ActionListener.wrap(v -> { + // Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion + final ActionListener afterCleanupsListener = + new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2); + asyncCleanupUnlinkedRootAndIndicesBlobs(foundIndices, rootBlobs, updatedRepoData, afterCleanupsListener); + final StepListener> writeMetaAndComputeDeletesStep = new StepListener<>(); + writeUpdatedShardMetaDataAndComputeDeletes(snapshotId, repositoryData, false, writeMetaAndComputeDeletesStep); + writeMetaAndComputeDeletesStep.whenComplete(deleteResults -> + asyncCleanupUnlinkedShardLevelBlobs(snapshotId, deleteResults, afterCleanupsListener), + afterCleanupsListener::onFailure); + }, listener::onFailure)); } } @@ -656,8 +658,9 @@ public void cleanup(long repositoryStateId, boolean writeShardGens, ActionListen listener.onResponse(new RepositoryCleanupResult(DeleteResult.ZERO)); } else { // write new index-N blob to ensure concurrent operations will fail - writeIndexGen(repositoryData, repositoryStateId, writeShardGens); - cleanupStaleBlobs(foundIndices, rootBlobs, repositoryData, ActionListener.map(listener, RepositoryCleanupResult::new)); + writeIndexGen(repositoryData, repositoryStateId, writeShardGens, + ActionListener.wrap(v -> cleanupStaleBlobs(foundIndices, rootBlobs, repositoryData, + ActionListener.map(listener, RepositoryCleanupResult::new)), listener::onFailure)); } } catch (Exception e) { listener.onFailure(e); @@ -768,11 +771,12 @@ public void finalizeSnapshot(final SnapshotId snapshotId, getRepositoryData(ActionListener.wrap(existingRepositoryData -> { final RepositoryData updatedRepositoryData = existingRepositoryData.addSnapshot(snapshotId, snapshotInfo.state(), shardGenerations); - writeIndexGen(updatedRepositoryData, repositoryStateId, writeShardGens); - if (writeShardGens) { - cleanupOldShardGens(existingRepositoryData, updatedRepositoryData); - } - listener.onResponse(snapshotInfo); + writeIndexGen(updatedRepositoryData, repositoryStateId, writeShardGens, ActionListener.wrap(v -> { + if (writeShardGens) { + cleanupOldShardGens(existingRepositoryData, updatedRepositoryData); + } + listener.onResponse(snapshotInfo); + }, onUpdateFailure)); }, onUpdateFailure)); }, onUpdateFailure), 2 + indices.size()); final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); @@ -1001,50 +1005,58 @@ public boolean isReadOnly() { return readOnly; } - protected void writeIndexGen(final RepositoryData repositoryData, final long expectedGen, - final boolean writeShardGens) throws IOException { - assert isReadOnly() == false; // can not write to a read only repository - final long currentGen = repositoryData.getGenId(); - if (currentGen != expectedGen) { - // the index file was updated by a concurrent operation, so we were operating on stale - // repository data - throw new RepositoryException(metadata.name(), "concurrent modification of the index-N file, expected current generation [" + - expectedGen + "], actual current generation [" + currentGen + - "] - possibly due to simultaneous snapshot deletion requests"); - } - final long newGen = currentGen + 1; - if (latestKnownRepoGen.get() >= newGen) { - throw new IllegalArgumentException( - "Tried writing generation [" + newGen + "] but repository is at least at generation [" + newGen + "] already"); - } - // write the index file - final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen); - logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob); - writeAtomic(indexBlob, - BytesReference.bytes(repositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), writeShardGens)), true); - final long latestKnownGen = latestKnownRepoGen.updateAndGet(known -> Math.max(known, newGen)); - if (newGen < latestKnownGen) { - // Don't mess up the index.latest blob - throw new IllegalStateException( - "Wrote generation [" + newGen + "] but latest known repo gen concurrently changed to [" + latestKnownGen + "]"); - } - // write the current generation to the index-latest file - final BytesReference genBytes; - try (BytesStreamOutput bStream = new BytesStreamOutput()) { - bStream.writeLong(newGen); - genBytes = bStream.bytes(); - } - logger.debug("Repository [{}] updating index.latest with generation [{}]", metadata.name(), newGen); - writeAtomic(INDEX_LATEST_BLOB, genBytes, false); - // delete the N-2 index file if it exists, keep the previous one around as a backup - if (newGen - 2 >= 0) { - final String oldSnapshotIndexFile = INDEX_FILE_PREFIX + Long.toString(newGen - 2); - try { - blobContainer().deleteBlobIgnoringIfNotExists(oldSnapshotIndexFile); - } catch (IOException e) { - logger.warn("Failed to clean up old index blob [{}]", oldSnapshotIndexFile); + /** + * @param repositoryData RepositoryData to write + * @param expectedGen expected repository generation at the start of the operation + * @param writeShardGens whether to write {@link ShardGenerations} to the new {@link RepositoryData} blob + * @param listener completion listener + */ + protected void writeIndexGen(RepositoryData repositoryData, long expectedGen, boolean writeShardGens, ActionListener listener) { + ActionListener.completeWith(listener, () -> { + assert isReadOnly() == false; // can not write to a read only repository + final long currentGen = repositoryData.getGenId(); + if (currentGen != expectedGen) { + // the index file was updated by a concurrent operation, so we were operating on stale + // repository data + throw new RepositoryException(metadata.name(), + "concurrent modification of the index-N file, expected current generation [" + expectedGen + + "], actual current generation [" + currentGen + "] - possibly due to simultaneous snapshot deletion requests"); } - } + final long newGen = currentGen + 1; + if (latestKnownRepoGen.get() >= newGen) { + throw new IllegalArgumentException( + "Tried writing generation [" + newGen + "] but repository is at least at generation [" + newGen + "] already"); + } + // write the index file + final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen); + logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob); + writeAtomic(indexBlob, + BytesReference.bytes(repositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), writeShardGens)), true); + final long latestKnownGen = latestKnownRepoGen.updateAndGet(known -> Math.max(known, newGen)); + if (newGen < latestKnownGen) { + // Don't mess up the index.latest blob + throw new IllegalStateException( + "Wrote generation [" + newGen + "] but latest known repo gen concurrently changed to [" + latestKnownGen + "]"); + } + // write the current generation to the index-latest file + final BytesReference genBytes; + try (BytesStreamOutput bStream = new BytesStreamOutput()) { + bStream.writeLong(newGen); + genBytes = bStream.bytes(); + } + logger.debug("Repository [{}] updating index.latest with generation [{}]", metadata.name(), newGen); + writeAtomic(INDEX_LATEST_BLOB, genBytes, false); + // delete the N-2 index file if it exists, keep the previous one around as a backup + if (newGen - 2 >= 0) { + final String oldSnapshotIndexFile = INDEX_FILE_PREFIX + Long.toString(newGen - 2); + try { + blobContainer().deleteBlobIgnoringIfNotExists(oldSnapshotIndexFile); + } catch (IOException e) { + logger.warn("Failed to clean up old index blob [{}]", oldSnapshotIndexFile); + } + } + return null; + }); } /** @@ -1438,7 +1450,7 @@ public void verify(String seed, DiscoveryNode localNode) { public String toString() { return "BlobStoreRepository[" + "[" + metadata.name() + - "], [" + blobStore() + ']' + + "], [" + blobStore.get() + ']' + ']'; } diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java index 4a9430df7f686..6677a4faeea44 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.repositories.blobstore; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.RepositoryMetaData; @@ -43,7 +44,6 @@ import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.threadpool.ThreadPool; -import java.io.IOException; import java.nio.file.Path; import java.util.Arrays; import java.util.Collection; @@ -142,7 +142,7 @@ public void testReadAndWriteSnapshotsThroughIndexFile() throws Exception { // write to and read from a index file with no entries assertThat(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository).getSnapshotIds().size(), equalTo(0)); final RepositoryData emptyData = RepositoryData.EMPTY; - repository.writeIndexGen(emptyData, emptyData.getGenId(), true); + writeIndexGen(repository, emptyData, emptyData.getGenId()); RepositoryData repoData = ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository); assertEquals(repoData, emptyData); assertEquals(repoData.getIndices().size(), 0); @@ -151,28 +151,29 @@ public void testReadAndWriteSnapshotsThroughIndexFile() throws Exception { // write to and read from an index file with snapshots but no indices repoData = addRandomSnapshotsToRepoData(repoData, false); - repository.writeIndexGen(repoData, repoData.getGenId(), true); + writeIndexGen(repository, repoData, repoData.getGenId()); assertEquals(repoData, ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository)); // write to and read from a index file with random repository data repoData = addRandomSnapshotsToRepoData(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), true); - repository.writeIndexGen(repoData, repoData.getGenId(), true); + writeIndexGen(repository, repoData, repoData.getGenId()); assertEquals(repoData, ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository)); } public void testIndexGenerationalFiles() throws Exception { final BlobStoreRepository repository = setupRepo(); + assertEquals(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), RepositoryData.EMPTY); // write to index generational file RepositoryData repositoryData = generateRandomRepoData(); - repository.writeIndexGen(repositoryData, repositoryData.getGenId(), true); + writeIndexGen(repository, repositoryData, RepositoryData.EMPTY_REPO_GEN); assertThat(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), equalTo(repositoryData)); assertThat(repository.latestIndexBlobId(), equalTo(0L)); assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(0L)); // adding more and writing to a new index generational file repositoryData = addRandomSnapshotsToRepoData(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), true); - repository.writeIndexGen(repositoryData, repositoryData.getGenId(), true); + writeIndexGen(repository, repositoryData, repositoryData.getGenId()); assertEquals(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), repositoryData); assertThat(repository.latestIndexBlobId(), equalTo(1L)); assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(1L)); @@ -180,24 +181,25 @@ public void testIndexGenerationalFiles() throws Exception { // removing a snapshot and writing to a new index generational file repositoryData = ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository).removeSnapshot( repositoryData.getSnapshotIds().iterator().next(), ShardGenerations.EMPTY); - repository.writeIndexGen(repositoryData, repositoryData.getGenId(), true); + writeIndexGen(repository, repositoryData, repositoryData.getGenId()); assertEquals(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), repositoryData); assertThat(repository.latestIndexBlobId(), equalTo(2L)); assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(2L)); } - public void testRepositoryDataConcurrentModificationNotAllowed() throws IOException { + public void testRepositoryDataConcurrentModificationNotAllowed() { final BlobStoreRepository repository = setupRepo(); // write to index generational file RepositoryData repositoryData = generateRandomRepoData(); final long startingGeneration = repositoryData.getGenId(); - repository.writeIndexGen(repositoryData, startingGeneration, true); + final PlainActionFuture future1 = PlainActionFuture.newFuture(); + repository.writeIndexGen(repositoryData, startingGeneration, true, future1); // write repo data again to index generational file, errors because we already wrote to the // N+1 generation from which this repository data instance was created - expectThrows(RepositoryException.class, () -> repository.writeIndexGen( - repositoryData.withGenId(startingGeneration + 1), repositoryData.getGenId(), true)); + expectThrows(RepositoryException.class, + () -> writeIndexGen(repository, repositoryData.withGenId(startingGeneration + 1), repositoryData.getGenId())); } public void testBadChunksize() throws Exception { @@ -232,6 +234,12 @@ public void testFsRepositoryCompressDeprecated() { " See the breaking changes documentation for the next major version."); } + private static void writeIndexGen(BlobStoreRepository repository, RepositoryData repositoryData, long generation) { + final PlainActionFuture future = PlainActionFuture.newFuture(); + repository.writeIndexGen(repositoryData, generation, true, future); + future.actionGet(); + } + private BlobStoreRepository setupRepo() { final Client client = client(); final Path location = ESIntegTestCase.randomRepoPath(node().settings());