diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/RepositoriesIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/RepositoriesIT.java index e72110f4c4efd..717627f864146 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/RepositoriesIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/RepositoriesIT.java @@ -35,6 +35,8 @@ import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse; import org.opensearch.action.admin.cluster.repositories.verify.VerifyRepositoryResponse; import org.opensearch.action.admin.cluster.state.ClusterStateResponse; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.index.IndexRequest; import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.client.Client; import org.opensearch.cluster.metadata.Metadata; @@ -46,6 +48,7 @@ import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.RepositoryException; import org.opensearch.repositories.RepositoryVerificationException; +import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.snapshots.mockstore.MockRepository; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.threadpool.ThreadPool; @@ -328,4 +331,125 @@ public void testRepositoryVerification() throws Exception { assertThat(ex.getMessage(), containsString("is not shared")); } } + + public void testSnapshotShardBlobDelete() throws Exception { + Client client = client(); + Path repositoryPath = randomRepoPath(); + final String repositoryName = "test-repo"; + final String firstSnapshot = "first-snapshot"; + final String secondSnapshot = "second-snapshot"; + final String indexName = "test-idx"; + + logger.info("--> creating repository at {}", repositoryPath.toAbsolutePath()); + int maxShardBlobDeleteBatchSize = randomIntBetween(1, 1000); + createRepository( + "test-repo", + "mock", + Settings.builder() + .put("location", repositoryPath) + .put(BlobStoreRepository.MAX_SNAPSHOT_SHARD_BLOB_DELETE_BATCH_SIZE.getKey(), maxShardBlobDeleteBatchSize) + ); + + logger.info("--> creating index-0 and ingest data"); + createIndex(indexName); + ensureGreen(); + for (int j = 0; j < randomIntBetween(1, 1000); j++) { + index(indexName, "_doc", Integer.toString(j), "foo", "bar" + j); + } + refresh(); + + logger.info("--> creating first snapshot"); + createFullSnapshot(repositoryName, firstSnapshot); + + int numberOfFiles = numberOfFiles(repositoryPath); + + logger.info("--> adding some more documents to test index"); + for (int j = 0; j < randomIntBetween(100, 10000); ++j) { + final BulkRequest bulkRequest = new BulkRequest(); + for (int i = 0; i < randomIntBetween(100, 1000); ++i) { + bulkRequest.add(new IndexRequest(indexName).source("foo" + j, "bar" + i)); + } + client().bulk(bulkRequest).get(); + } + refresh(); + + logger.info("--> creating second snapshot"); + createFullSnapshot(repositoryName, secondSnapshot); + + // Delete second snapshot + logger.info("--> delete second snapshot"); + client.admin().cluster().prepareDeleteSnapshot(repositoryName, secondSnapshot).get(); + + logger.info("--> make sure that number of files is back to what it was when the first snapshot was made"); + assertFileCount(repositoryPath, numberOfFiles); + + logger.info("--> done"); + } + + public void testSnapshotShardBlobDeletionRepositoryThrowingError() throws Exception { + Client client = client(); + Path repositoryPath = randomRepoPath(); + final String repositoryName = "test-repo"; + final String firstSnapshot = "first-snapshot"; + final String secondSnapshot = "second-snapshot"; + final String indexName = "test-idx"; + + logger.info("--> creating repository at {}", repositoryPath.toAbsolutePath()); + int maxShardBlobDeleteBatchSize = randomIntBetween(1, 1000); + createRepository( + "test-repo", + "mock", + Settings.builder() + .put("location", repositoryPath) + .put(BlobStoreRepository.MAX_SNAPSHOT_SHARD_BLOB_DELETE_BATCH_SIZE.getKey(), maxShardBlobDeleteBatchSize) + ); + + logger.info("--> creating index-0 and ingest data"); + createIndex(indexName); + ensureGreen(); + for (int j = 0; j < randomIntBetween(1, 1000); j++) { + index(indexName, "_doc", Integer.toString(j), "foo", "bar" + j); + } + refresh(); + + logger.info("--> creating first snapshot"); + createFullSnapshot(repositoryName, firstSnapshot); + + logger.info("--> adding some more documents to test index"); + for (int j = 0; j < randomIntBetween(100, 1000); ++j) { + final BulkRequest bulkRequest = new BulkRequest(); + for (int i = 0; i < randomIntBetween(100, 1000); ++i) { + bulkRequest.add(new IndexRequest(indexName).source("foo" + j, "bar" + i)); + } + client().bulk(bulkRequest).get(); + } + refresh(); + + logger.info("--> creating second snapshot"); + createFullSnapshot(repositoryName, secondSnapshot); + + // Make repository to throw exception when trying to delete stale snapshot shard blobs + String clusterManagerNode = internalCluster().getMasterName(); + ((MockRepository) internalCluster().getInstance(RepositoriesService.class, clusterManagerNode).repository("test-repo")) + .setThrowExceptionWhileDelete(true); + + // Delete second snapshot + logger.info("--> delete second snapshot"); + client.admin().cluster().prepareDeleteSnapshot(repositoryName, secondSnapshot).get(); + + // Make repository to work normally + ((MockRepository) internalCluster().getInstance(RepositoriesService.class, clusterManagerNode).repository("test-repo")) + .setThrowExceptionWhileDelete(false); + + // This snapshot should delete last snapshot's residual stale shard blobs as well + logger.info("--> delete first snapshot"); + client.admin().cluster().prepareDeleteSnapshot(repositoryName, firstSnapshot).get(); + + // Expect two files to remain in the repository: + // (1) index-(N+1) + // (2) index-latest + assertFileCount(repositoryPath, 2); + + logger.info("--> done"); + } } diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 6eaec491c8177..c36d92abcf498 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -147,6 +147,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -235,6 +236,17 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp Setting.Property.NodeScope ); + /** + * Setting to set batch size of stale snapshot shard blobs that will be deleted by snapshot workers as part of snapshot deletion. + * For optimal performance the value of the setting should be equal to or close to repository's max # of keys that can be deleted in single operation + * Most cloud storage support upto 1000 key(s) deletion in single operation, thus keeping default value to be 1000. + */ + public static final Setting MAX_SNAPSHOT_SHARD_BLOB_DELETE_BATCH_SIZE = Setting.intSetting( + "max_snapshot_shard_blob_delete_batch_size", + 1000, // the default maximum batch size of stale snapshot shard blobs deletion + Setting.Property.NodeScope + ); + /** * Setting to disable writing the {@code index.latest} blob which enables the contents of this repository to be used with a * url-repository. @@ -243,6 +255,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp protected final boolean supportURLRepo; + private final int maxShardBlobDeleteBatch; + private final boolean compress; private final boolean cacheRepositoryData; @@ -358,6 +372,7 @@ protected BlobStoreRepository( readOnly = metadata.settings().getAsBoolean("readonly", false); cacheRepositoryData = CACHE_REPOSITORY_DATA.get(metadata.settings()); bufferSize = Math.toIntExact(BUFFER_SIZE_SETTING.get(metadata.settings()).getBytes()); + maxShardBlobDeleteBatch = MAX_SNAPSHOT_SHARD_BLOB_DELETE_BATCH_SIZE.get(metadata.settings()); } @Override @@ -902,15 +917,57 @@ private void asyncCleanupUnlinkedShardLevelBlobs( listener.onResponse(null); return; } - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> { - try { - deleteFromContainer(blobContainer(), filesToDelete); - l.onResponse(null); - } catch (Exception e) { - logger.warn(() -> new ParameterizedMessage("{} Failed to delete some blobs during snapshot delete", snapshotIds), e); - throw e; + + try { + AtomicInteger counter = new AtomicInteger(); + Collection> subList = filesToDelete.stream() + .collect(Collectors.groupingBy(it -> counter.getAndIncrement() / maxShardBlobDeleteBatch)) + .values(); + final BlockingQueue> staleFilesToDeleteInBatch = new LinkedBlockingQueue<>(subList); + + final GroupedActionListener groupedListener = new GroupedActionListener<>( + ActionListener.wrap(r -> { listener.onResponse(null); }, listener::onFailure), + staleFilesToDeleteInBatch.size() + ); + + // Start as many workers as fit into the snapshot pool at once at the most + final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), staleFilesToDeleteInBatch.size()); + for (int i = 0; i < workers; ++i) { + executeStaleShardDelete(staleFilesToDeleteInBatch, groupedListener); } - })); + + } catch (Exception e) { + // TODO: We shouldn't be blanket catching and suppressing all exceptions here and instead handle them safely upstream. + // Currently this catch exists as a stop gap solution to tackle unexpected runtime exceptions from implementations + // bubbling up and breaking the snapshot functionality. + assert false : e; + logger.warn(new ParameterizedMessage("[{}] Exception during cleanup of stale shard blobs", snapshotIds), e); + listener.onFailure(e); + } + } + + private void executeStaleShardDelete(BlockingQueue> staleFilesToDeleteInBatch, GroupedActionListener listener) + throws InterruptedException { + List filesToDelete = staleFilesToDeleteInBatch.poll(0L, TimeUnit.MILLISECONDS); + if (filesToDelete != null) { + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> { + try { + deleteFromContainer(blobContainer(), filesToDelete); + l.onResponse(null); + } catch (Exception e) { + logger.warn( + () -> new ParameterizedMessage( + "[{}] Failed to delete following blobs during snapshot delete : {}", + metadata.name(), + filesToDelete + ), + e + ); + l.onFailure(e); + } + executeStaleShardDelete(staleFilesToDeleteInBatch, listener); + })); + } } // updates the shard state metadata for shards of a snapshot that is to be deleted. Also computes the files to be cleaned up. diff --git a/test/framework/src/main/java/org/opensearch/snapshots/mockstore/MockRepository.java b/test/framework/src/main/java/org/opensearch/snapshots/mockstore/MockRepository.java index 4e641f9505e3e..fc8ff3ce841e7 100644 --- a/test/framework/src/main/java/org/opensearch/snapshots/mockstore/MockRepository.java +++ b/test/framework/src/main/java/org/opensearch/snapshots/mockstore/MockRepository.java @@ -474,6 +474,9 @@ public void deleteBlobsIgnoringIfNotExists(List blobNames) throws IOExce if (blockOnDeleteIndexN && blobNames.stream().anyMatch(name -> name.startsWith(BlobStoreRepository.INDEX_FILE_PREFIX))) { blockExecutionAndMaybeWait("index-{N}"); } + if (setThrowExceptionWhileDelete) { + throw new IOException("Random exception"); + } super.deleteBlobsIgnoringIfNotExists(blobNames); }