From 92458fcbcca2d36a2a16f98e1f91d739bbebbb98 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 19 Sep 2023 06:24:46 +0100 Subject: [PATCH] Small cleanup in BlobStoreRepository#finalizeSnapshot (#99635) Reorders the operations into their logical order and adds a few TODOs for gaps that this cleanup exposes. --- .../blobstore/BlobStoreSizeLimitIT.java | 9 +- .../blobstore/BlobStoreRepository.java | 211 +++++++++++------- 2 files changed, 138 insertions(+), 82 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreSizeLimitIT.java b/server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreSizeLimitIT.java index 73ad2737f1f10..ffe6133e034bc 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreSizeLimitIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreSizeLimitIT.java @@ -14,11 +14,14 @@ import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.fs.FsRepository; import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase; +import org.elasticsearch.snapshots.SnapshotException; import org.hamcrest.Matchers; import java.util.List; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; public class BlobStoreSizeLimitIT extends AbstractSnapshotIntegTestCase { @@ -32,7 +35,11 @@ public void testBlobStoreSizeIsLimited() throws Exception { ); final List snapshotNames = createNSnapshots(repoName, maxSnapshots); final ActionFuture failingSnapshotFuture = startFullSnapshot(repoName, "failing-snapshot"); - final RepositoryException repositoryException = expectThrows(RepositoryException.class, failingSnapshotFuture::actionGet); + final SnapshotException snapshotException = expectThrows(SnapshotException.class, failingSnapshotFuture::actionGet); + assertThat(snapshotException.getRepositoryName(), equalTo(repoName)); + assertThat(snapshotException.getSnapshotName(), equalTo("failing-snapshot")); + assertThat(snapshotException.getCause(), instanceOf(RepositoryException.class)); + final RepositoryException repositoryException = (RepositoryException) snapshotException.getCause(); assertThat( repositoryException.getMessage(), Matchers.endsWith( diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index afe5562103240..436d34bc39c21 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -30,6 +30,7 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.RefCountingListener; import org.elasticsearch.action.support.RefCountingRunnable; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; @@ -1326,44 +1327,107 @@ public void finalizeSnapshot(final FinalizeSnapshotContext finalizeSnapshotConte // when writing the index-${N} to each shard directory. final IndexVersion repositoryMetaVersion = finalizeSnapshotContext.repositoryMetaVersion(); final boolean writeShardGens = SnapshotsService.useShardGenerations(repositoryMetaVersion); - final Consumer onUpdateFailure = e -> finalizeSnapshotContext.onFailure( - new SnapshotException(metadata.name(), snapshotId, "failed to update snapshot in repository", e) - ); final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); final boolean writeIndexGens = SnapshotsService.useIndexGenerations(repositoryMetaVersion); - final ListenableFuture repoDataListener = new ListenableFuture<>(); - getRepositoryData(repoDataListener); - repoDataListener.addListener(ActionListener.wrap(existingRepositoryData -> { - final int existingSnapshotCount = existingRepositoryData.getSnapshotIds().size(); - if (existingSnapshotCount >= maxSnapshotCount) { - finalizeSnapshotContext.onFailure( - new RepositoryException( + record MetadataWriteResult( + RepositoryData existingRepositoryData, + Map indexMetas, + Map indexMetaIdentifiers + ) {} + + record RootBlobUpdateResult(RepositoryData oldRepositoryData, RepositoryData newRepositoryData) {} + + SubscribableListener + + // Get the current RepositoryData + .newForked(this::getRepositoryData) + + // Identify and write the missing metadata + .andThen((l, existingRepositoryData) -> { + final int existingSnapshotCount = existingRepositoryData.getSnapshotIds().size(); + if (existingSnapshotCount >= maxSnapshotCount) { + throw new RepositoryException( metadata.name(), - "Cannot add another snapshot to this repository as it " - + "already contains [" + "Cannot add another snapshot to this repository as it already contains [" + existingSnapshotCount + "] snapshots and is configured to hold up to [" + maxSnapshotCount + "] snapshots only." - ) - ); - return; - } + ); + } - final Map indexMetas; - final Map indexMetaIdentifiers; - if (writeIndexGens) { - indexMetaIdentifiers = ConcurrentCollections.newConcurrentMap(); - indexMetas = ConcurrentCollections.newConcurrentMap(); - } else { - indexMetas = null; - indexMetaIdentifiers = null; - } + final MetadataWriteResult metadataWriteResult; + if (writeIndexGens) { + metadataWriteResult = new MetadataWriteResult( + existingRepositoryData, + ConcurrentCollections.newConcurrentMap(), + ConcurrentCollections.newConcurrentMap() + ); + } else { + metadataWriteResult = new MetadataWriteResult(existingRepositoryData, null, null); + } + + try (var allMetaListeners = new RefCountingListener(l.map(ignored -> metadataWriteResult))) { + // We ignore all FileAlreadyExistsException when writing metadata since otherwise a master failover while in this method + // will mean that no snap-${uuid}.dat blob is ever written for this snapshot. This is safe because any updated version + // of the index or global metadata will be compatible with the segments written in this snapshot as well. + // Failing on an already existing index-${repoGeneration} below ensures that the index.latest blob is not updated in a + // way that decrements the generation it points at + + // Write global metadata + final Metadata clusterMetadata = finalizeSnapshotContext.clusterMetadata(); + executor.execute( + ActionRunnable.run( + allMetaListeners.acquire(), + () -> GLOBAL_METADATA_FORMAT.write(clusterMetadata, blobContainer(), snapshotId.getUUID(), compress) + ) + ); + + // Write the index metadata for each index in the snapshot + for (IndexId index : indices) { + executor.execute(ActionRunnable.run(allMetaListeners.acquire(), () -> { + final IndexMetadata indexMetaData = clusterMetadata.index(index.getName()); + if (writeIndexGens) { + final String identifiers = IndexMetaDataGenerations.buildUniqueIdentifier(indexMetaData); + String metaUUID = existingRepositoryData.indexMetaDataGenerations().getIndexMetaBlobId(identifiers); + if (metaUUID == null) { + // We don't yet have this version of the metadata so we write it + metaUUID = UUIDs.base64UUID(); + INDEX_METADATA_FORMAT.write(indexMetaData, indexContainer(index), metaUUID, compress); + metadataWriteResult.indexMetaIdentifiers().put(identifiers, metaUUID); + } // else this task was largely a no-op - TODO no need to fork in that case + metadataWriteResult.indexMetas().put(index, identifiers); + } else { + INDEX_METADATA_FORMAT.write( + clusterMetadata.index(index.getName()), + indexContainer(index), + snapshotId.getUUID(), + compress + ); + } + })); + } + + // Write the SnapshotInfo blob + executor.execute( + ActionRunnable.run( + allMetaListeners.acquire(), + () -> SNAPSHOT_FORMAT.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), compress) + ) + ); + + // TODO fail fast if any metadata write fails + // TODO clean up successful metadata writes on failure (needs care, we must not clobber another node concurrently + // finalizing the same snapshot: we can only clean up after removing the failed snapshot from the cluster state) + } + }) - try (var allMetaListeners = new RefCountingListener(ActionListener.wrap(v -> { + // Update the root blob + .andThen((l, metadataWriteResult) -> { + // unlikely, but in theory we could still be on the thread which called finalizeSnapshot - TODO must fork to SNAPSHOT here final String slmPolicy = slmPolicy(snapshotInfo); final SnapshotDetails snapshotDetails = new SnapshotDetails( snapshotInfo.state(), @@ -1372,64 +1436,41 @@ public void finalizeSnapshot(final FinalizeSnapshotContext finalizeSnapshotConte snapshotInfo.endTime(), slmPolicy ); + final var existingRepositoryData = metadataWriteResult.existingRepositoryData(); writeIndexGen( - existingRepositoryData.addSnapshot(snapshotId, snapshotDetails, shardGenerations, indexMetas, indexMetaIdentifiers), + existingRepositoryData.addSnapshot( + snapshotId, + snapshotDetails, + shardGenerations, + metadataWriteResult.indexMetas(), + metadataWriteResult.indexMetaIdentifiers() + ), repositoryStateId, repositoryMetaVersion, finalizeSnapshotContext::updatedClusterState, - ActionListener.wrap(newRepoData -> { - finalizeSnapshotContext.onResponse(newRepoData); - cleanupOldMetadata(existingRepositoryData, newRepoData, finalizeSnapshotContext, snapshotInfo, writeShardGens); - }, onUpdateFailure) + l.map(newRepositoryData -> new RootBlobUpdateResult(existingRepositoryData, newRepositoryData)) ); - }, onUpdateFailure))) { - - // We ignore all FileAlreadyExistsException when writing metadata since otherwise a master failover while in this method - // will mean that no snap-${uuid}.dat blob is ever written for this snapshot. This is safe because any updated version of - // the index or global metadata will be compatible with the segments written in this snapshot as well. - // Failing on an already existing index-${repoGeneration} below ensures that the index.latest blob is not updated in a way - // that decrements the generation it points at - final Metadata clusterMetadata = finalizeSnapshotContext.clusterMetadata(); - // Write Global MetaData - executor.execute( - ActionRunnable.run( - allMetaListeners.acquire(), - () -> GLOBAL_METADATA_FORMAT.write(clusterMetadata, blobContainer(), snapshotId.getUUID(), compress) - ) + // NB failure of writeIndexGen doesn't guarantee the update failed, so we cannot safely clean anything up on failure + }) + + // Report success, then clean up. + .andThen((l, rootBlobUpdateResult) -> { + l.onResponse(rootBlobUpdateResult.newRepositoryData()); + cleanupOldMetadata( + rootBlobUpdateResult.oldRepositoryData(), + rootBlobUpdateResult.newRepositoryData(), + finalizeSnapshotContext, + snapshotInfo, + writeShardGens ); + }) - // write the index metadata for each index in the snapshot - for (IndexId index : indices) { - executor.execute(ActionRunnable.run(allMetaListeners.acquire(), () -> { - final IndexMetadata indexMetaData = clusterMetadata.index(index.getName()); - if (writeIndexGens) { - final String identifiers = IndexMetaDataGenerations.buildUniqueIdentifier(indexMetaData); - String metaUUID = existingRepositoryData.indexMetaDataGenerations().getIndexMetaBlobId(identifiers); - if (metaUUID == null) { - // We don't yet have this version of the metadata so we write it - metaUUID = UUIDs.base64UUID(); - INDEX_METADATA_FORMAT.write(indexMetaData, indexContainer(index), metaUUID, compress); - indexMetaIdentifiers.put(identifiers, metaUUID); - } - indexMetas.put(index, identifiers); - } else { - INDEX_METADATA_FORMAT.write( - clusterMetadata.index(index.getName()), - indexContainer(index), - snapshotId.getUUID(), - compress - ); - } - })); - } - executor.execute( - ActionRunnable.run( - allMetaListeners.acquire(), - () -> SNAPSHOT_FORMAT.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), compress) - ) - ); - } - }, onUpdateFailure)); + // Finally subscribe the context as the listener, wrapping exceptions if needed + .addListener( + finalizeSnapshotContext.delegateResponse( + (l, e) -> l.onFailure(new SnapshotException(metadata.name(), snapshotId, "failed to update snapshot in repository", e)) + ) + ); } // Delete all old shard gen and root level index blobs that aren't referenced any longer as a result from moving to updated @@ -1471,13 +1512,21 @@ private void cleanupOldMetadata( } } } + if (toDelete.isEmpty() == false) { - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { - try { + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() { + @Override + protected void doRun() throws Exception { deleteFromContainer(blobContainer(), toDelete.iterator()); - } catch (Exception e) { + } + + @Override + public void onFailure(Exception e) { logger.warn("Failed to clean up old metadata blobs", e); - } finally { + } + + @Override + public void onAfter() { finalizeSnapshotContext.onDone(snapshotInfo); } });