diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java index ffa45e3136b51..3facfa6319766 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/RepositoriesIT.java @@ -21,10 +21,12 @@ import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.RepositoryConflictException; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.RepositoryVerificationException; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.snapshots.mockstore.MockRepository; import org.elasticsearch.test.ESIntegTestCase; import java.nio.file.Path; @@ -295,4 +297,58 @@ public void testRepositoryConflict() throws Exception { logger.info("--> wait until snapshot deletion is finished"); assertAcked(future.actionGet()); } + + public void testLeakedStaleIndicesAreDeletedBySubsequentDelete() throws Exception { + Client client = client(); + Path repositoryPath = randomRepoPath(); + final String repositoryName = "test-repo"; + final String snapshot1Name = "test-snap-1"; + final String snapshot2Name = "test-snap-2"; + + logger.info("--> creating repository at {}", repositoryPath.toAbsolutePath()); + createRepository(repositoryName, "mock", repositoryPath); + + logger.info("--> creating index-1 and ingest data"); + createIndex("test-idx-1"); + ensureGreen(); + for (int j = 0; j < 10; j++) { + indexDoc("test-idx-1", Integer.toString(10 + j), "foo", "bar" + 10 + j); + } + refresh(); + + logger.info("--> creating first snapshot"); + createFullSnapshot(repositoryName, snapshot1Name); + + logger.info("--> creating index-2 and ingest data"); + createIndex("test-idx-2"); + ensureGreen(); + for (int j = 0; j < 10; j++) { + indexDoc("test-idx-2", Integer.toString(10 + j), "foo", "bar" + 10 + j); + } + refresh(); + + logger.info("--> creating second snapshot"); + createFullSnapshot(repositoryName, snapshot2Name); + + // Make repository throw exceptions when trying to delete stale indices + // This will make sure stale indices stay in repository after snapshot delete + final var repository = (MockRepository) internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class) + .repository(repositoryName); + repository.setFailOnDelete(true); + + logger.info("--> delete the second snapshot"); + client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot2Name).get(); + + // Make repository work normally + repository.setFailOnDelete(false); + + // This snapshot should delete last snapshot's residual stale indices as well + logger.info("--> delete snapshot one"); + client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot1Name).get(); + + logger.info("--> check no leftover files"); + assertFileCount(repositoryPath, 2); // just the index-N and index.latest blobs + + logger.info("--> done"); + } } diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/DeleteResult.java b/server/src/main/java/org/elasticsearch/common/blobstore/DeleteResult.java index c12685db77696..8630ed4cc768f 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/DeleteResult.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/DeleteResult.java @@ -38,4 +38,12 @@ public DeleteResult add(DeleteResult other) { public DeleteResult add(long blobs, long bytes) { return new DeleteResult(blobsDeleted + blobs, bytesDeleted + bytes); } + + public static DeleteResult of(long blobs, long bytes) { + if (blobs == 0 && bytes == 0) { + return ZERO; + } else { + return new DeleteResult(blobs, bytes); + } + } } diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 33e682354c9cc..fa699a76fadd7 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -72,6 +72,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.ListenableFuture; +import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner; import org.elasticsearch.common.xcontent.ChunkedToXContent; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.core.CheckedConsumer; @@ -390,6 +391,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp private final ShardSnapshotTaskRunner shardSnapshotTaskRunner; + private final ThrottledTaskRunner staleBlobDeleteRunner; + /** * Constructs new BlobStoreRepository * @param metadata The metadata for this repository including name and settings @@ -430,6 +433,11 @@ protected BlobStoreRepository( this::doSnapshotShard, this::snapshotFile ); + staleBlobDeleteRunner = new ThrottledTaskRunner( + "cleanupStaleBlobs", + threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), + threadPool.executor(ThreadPool.Names.SNAPSHOT) + ); } @Override @@ -1138,30 +1146,53 @@ private void cleanupStaleBlobs( RepositoryData newRepoData, ActionListener listener ) { - final GroupedActionListener groupedListener = new GroupedActionListener<>(2, ActionListener.wrap(deleteResults -> { - DeleteResult deleteResult = DeleteResult.ZERO; - for (DeleteResult result : deleteResults) { - deleteResult = deleteResult.add(result); + final var blobsDeleted = new AtomicLong(); + final var bytesDeleted = new AtomicLong(); + try (var listeners = new RefCountingListener(listener.map(ignored -> DeleteResult.of(blobsDeleted.get(), bytesDeleted.get())))) { + + final List staleRootBlobs = staleRootBlobs(newRepoData, rootBlobs.keySet()); + if (staleRootBlobs.isEmpty() == false) { + staleBlobDeleteRunner.enqueueTask(listeners.acquire(ref -> { + try (ref) { + logStaleRootLevelBlobs(newRepoData.getGenId() - 1, deletedSnapshots, staleRootBlobs); + deleteFromContainer(blobContainer(), staleRootBlobs.iterator()); + for (final var staleRootBlob : staleRootBlobs) { + bytesDeleted.addAndGet(rootBlobs.get(staleRootBlob).length()); + } + blobsDeleted.addAndGet(staleRootBlobs.size()); + } catch (Exception e) { + logger.warn( + () -> format( + "[%s] The following blobs are no longer part of any snapshot [%s] but failed to remove them", + metadata.name(), + staleRootBlobs + ), + e + ); + } + })); } - listener.onResponse(deleteResult); - }, listener::onFailure)); - - final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); - final List staleRootBlobs = staleRootBlobs(newRepoData, rootBlobs.keySet()); - if (staleRootBlobs.isEmpty()) { - groupedListener.onResponse(DeleteResult.ZERO); - } else { - executor.execute(ActionRunnable.supply(groupedListener, () -> { - List deletedBlobs = cleanupStaleRootFiles(newRepoData.getGenId() - 1, deletedSnapshots, staleRootBlobs); - return new DeleteResult(deletedBlobs.size(), deletedBlobs.stream().mapToLong(name -> rootBlobs.get(name).length()).sum()); - })); - } - final Set survivingIndexIds = newRepoData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toSet()); - if (foundIndices.keySet().equals(survivingIndexIds)) { - groupedListener.onResponse(DeleteResult.ZERO); - } else { - executor.execute(ActionRunnable.supply(groupedListener, () -> cleanupStaleIndices(foundIndices, survivingIndexIds))); + final var survivingIndexIds = newRepoData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toSet()); + for (final var indexEntry : foundIndices.entrySet()) { + final var indexSnId = indexEntry.getKey(); + if (survivingIndexIds.contains(indexSnId)) { + continue; + } + staleBlobDeleteRunner.enqueueTask(listeners.acquire(ref -> { + try (ref) { + logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId); + final var deleteResult = indexEntry.getValue().delete(OperationPurpose.SNAPSHOT); + blobsDeleted.addAndGet(deleteResult.blobsDeleted()); + bytesDeleted.addAndGet(deleteResult.bytesDeleted()); + logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId); + } catch (IOException e) { + logger.warn(() -> format(""" + [%s] index %s is no longer part of any snapshot in the repository, \ + but failed to clean up its index folder""", metadata.name(), indexSnId), e); + } + })); + } } } @@ -1171,8 +1202,8 @@ private void cleanupStaleBlobs( * TODO: Add shard level cleanups * TODO: Add unreferenced index metadata cleanup *
    - *
  • Deleting stale indices {@link #cleanupStaleIndices}
  • - *
  • Deleting unreferenced root level blobs {@link #cleanupStaleRootFiles}
  • + *
  • Deleting stale indices
  • + *
  • Deleting unreferenced root level blobs
  • *
* @param repositoryStateId Current repository state id * @param repositoryMetaVersion version of the updated repository metadata to write @@ -1250,70 +1281,25 @@ private static List staleRootBlobs(RepositoryData repositoryData, Set cleanupStaleRootFiles( - long previousGeneration, - Collection deletedSnapshots, - List blobsToDelete - ) { - if (blobsToDelete.isEmpty()) { - return blobsToDelete; - } - try { - if (logger.isInfoEnabled()) { - // If we're running root level cleanup as part of a snapshot delete we should not log the snapshot- and global metadata - // blobs associated with the just deleted snapshots as they are expected to exist and not stale. Otherwise every snapshot - // delete would also log a confusing INFO message about "stale blobs". - final Set blobNamesToIgnore = deletedSnapshots.stream() - .flatMap( - snapshotId -> Stream.of( - GLOBAL_METADATA_FORMAT.blobName(snapshotId.getUUID()), - SNAPSHOT_FORMAT.blobName(snapshotId.getUUID()), - INDEX_FILE_PREFIX + previousGeneration - ) + private void logStaleRootLevelBlobs(long previousGeneration, Collection deletedSnapshots, List blobsToDelete) { + if (logger.isInfoEnabled()) { + // If we're running root level cleanup as part of a snapshot delete we should not log the snapshot- and global metadata + // blobs associated with the just deleted snapshots as they are expected to exist and not stale. Otherwise every snapshot + // delete would also log a confusing INFO message about "stale blobs". + final Set blobNamesToIgnore = deletedSnapshots.stream() + .flatMap( + snapshotId -> Stream.of( + GLOBAL_METADATA_FORMAT.blobName(snapshotId.getUUID()), + SNAPSHOT_FORMAT.blobName(snapshotId.getUUID()), + INDEX_FILE_PREFIX + previousGeneration ) - .collect(Collectors.toSet()); - final List blobsToLog = blobsToDelete.stream().filter(b -> blobNamesToIgnore.contains(b) == false).toList(); - if (blobsToLog.isEmpty() == false) { - logger.info("[{}] Found stale root level blobs {}. Cleaning them up", metadata.name(), blobsToLog); - } - } - deleteFromContainer(blobContainer(), blobsToDelete.iterator()); - return blobsToDelete; - } catch (Exception e) { - logger.warn( - () -> format( - "[%s] The following blobs are no longer part of any snapshot [%s] but failed to remove them", - metadata.name(), - blobsToDelete - ), - e - ); - } - return Collections.emptyList(); - } - - private DeleteResult cleanupStaleIndices(Map foundIndices, Set survivingIndexIds) { - DeleteResult deleteResult = DeleteResult.ZERO; - for (Map.Entry indexEntry : foundIndices.entrySet()) { - final String indexSnId = indexEntry.getKey(); - try { - if (survivingIndexIds.contains(indexSnId) == false) { - logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId); - deleteResult = deleteResult.add(indexEntry.getValue().delete(OperationPurpose.SNAPSHOT)); - logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId); - } - } catch (Exception e) { - logger.warn( - () -> format( - "[%s] index %s is no longer part of any snapshot in the repository, " + "but failed to clean up its index folder", - metadata.name(), - indexSnId - ), - e - ); + ) + .collect(Collectors.toSet()); + final List blobsToLog = blobsToDelete.stream().filter(b -> blobNamesToIgnore.contains(b) == false).toList(); + if (blobsToLog.isEmpty() == false) { + logger.info("[{}] Found stale root level blobs {}. Cleaning them up", metadata.name(), blobsToLog); } } - return deleteResult; } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index d5b59ef3274ea..182526ea0a176 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -170,6 +170,8 @@ public long getFailureCount() { private volatile boolean blocked = false; + private volatile boolean failOnDelete = false; + public MockRepository( RepositoryMetadata metadata, Environment environment, @@ -352,6 +354,13 @@ public void setBlockOnceOnReadSnapshotInfoIfAlreadyBlocked() { blockOnceOnReadSnapshotInfo.set(true); } + /** + * Sets the fail-on-delete flag, which if {@code true} throws an exception when deleting a blob. + */ + public void setFailOnDelete(boolean failOnDelete) { + this.failOnDelete = failOnDelete; + } + public boolean blocked() { return blocked; } @@ -550,6 +559,9 @@ public InputStream readBlob(OperationPurpose purpose, String name, long position @Override public DeleteResult delete(OperationPurpose purpose) throws IOException { + if (failOnDelete) { + throw new IOException("simulated delete failure"); + } DeleteResult deleteResult = DeleteResult.ZERO; for (BlobContainer child : children(purpose).values()) { deleteResult = deleteResult.add(child.delete(purpose));