From 9a4860501883868fe97930879c69d7226ee3dcd5 Mon Sep 17 00:00:00 2001 From: Piyush Daftary Date: Wed, 16 Feb 2022 01:36:53 -0800 Subject: [PATCH] Parallelize stale blobs deletion during snapshot delete Signed-off-by: Piyush Daftary --- .../opensearch/snapshots/RepositoriesIT.java | 54 ++++++++++++++ .../blobstore/BlobStoreRepository.java | 71 ++++++++++++++++--- 2 files changed, 117 insertions(+), 8 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/RepositoriesIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/RepositoriesIT.java index 662e97dd84fda..bdd5941aebb3d 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/RepositoriesIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/RepositoriesIT.java @@ -35,6 +35,8 @@ import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse; import org.opensearch.action.admin.cluster.repositories.verify.VerifyRepositoryResponse; import org.opensearch.action.admin.cluster.state.ClusterStateResponse; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.index.IndexRequest; import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.client.Client; import org.opensearch.cluster.metadata.Metadata; @@ -46,6 +48,7 @@ import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.RepositoryException; import org.opensearch.repositories.RepositoryVerificationException; +import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.snapshots.mockstore.MockRepository; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.threadpool.ThreadPool; @@ -198,6 +201,57 @@ public void testResidualStaleIndicesAreDeletedByConsecutiveDelete() throws Excep logger.info("--> done"); } + public void testSnapshotShardBlobDelete() throws Exception { + Client client = client(); + Path repositoryPath = randomRepoPath(); + final String repositoryName = "test-repo"; + final String firstSnapshot = "first-snapshot"; + final String secondSnapshot = "second-snapshot"; + final String indexName = "test-idx"; + + logger.info("--> creating repository at {}", repositoryPath.toAbsolutePath()); + createRepository( + "test-repo", + "mock", + Settings.builder().put("location", repositoryPath).put(BlobStoreRepository.MAX_STALE_BLOB_DELETE_BATCH_SIZE.getKey(), 10) + ); + + logger.info("--> creating index-0 and ingest data"); + createIndex(indexName); + ensureGreen(); + for (int j = 0; j < 10; j++) { + index(indexName, "_doc", Integer.toString(10 + j), "foo", "bar" + 10 + j); + } + refresh(); + + logger.info("--> creating first snapshot"); + createFullSnapshot(repositoryName, firstSnapshot); + + int numberOfFiles = numberOfFiles(repositoryPath); + + logger.info("--> adding some more documents to test index"); + for (int j = 0; j < 10; ++j) { + final BulkRequest bulkRequest = new BulkRequest(); + for (int i = 0; i < 100; ++i) { + bulkRequest.add(new IndexRequest(indexName).source("foo" + j, "bar" + i)); + } + client().bulk(bulkRequest).get(); + } + refresh(); + + logger.info("--> creating second snapshot"); + createFullSnapshot(repositoryName, secondSnapshot); + + // Delete second snapshot + logger.info("--> delete second snapshot"); + client.admin().cluster().prepareDeleteSnapshot(repositoryName, secondSnapshot).get(); + + logger.info("--> make sure that number of files is back to what it was when the first snapshot was made"); + assertFileCount(repositoryPath, numberOfFiles); + + logger.info("--> done"); + } + private RepositoryMetadata findRepository(List repositories, String name) { for (RepositoryMetadata repository : repositories) { if (repository.name().equals(name)) { diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 7d6cdef76198f..25bc810d48485 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -233,6 +233,15 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp Setting.Property.NodeScope ); + /** + * Setting to set batch size of stale blobs to be deleted. + */ + public static final Setting MAX_STALE_BLOB_DELETE_BATCH_SIZE = Setting.intSetting( + "max_stale_blob_delete_batch_size", + 1000, + Setting.Property.NodeScope + ); + /** * Setting to disable writing the {@code index.latest} blob which enables the contents of this repository to be used with a * url-repository. @@ -241,6 +250,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp protected final boolean supportURLRepo; + private final int maxStaleBlobDeleteBatch; + private final boolean compress; private final boolean cacheRepositoryData; @@ -356,6 +367,7 @@ protected BlobStoreRepository( readOnly = metadata.settings().getAsBoolean("readonly", false); cacheRepositoryData = CACHE_REPOSITORY_DATA.get(metadata.settings()); bufferSize = Math.toIntExact(BUFFER_SIZE_SETTING.get(metadata.settings()).getBytes()); + maxStaleBlobDeleteBatch = MAX_STALE_BLOB_DELETE_BATCH_SIZE.get(metadata.settings()); } @Override @@ -900,15 +912,58 @@ private void asyncCleanupUnlinkedShardLevelBlobs( listener.onResponse(null); return; } - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> { - try { - deleteFromContainer(blobContainer(), filesToDelete); - l.onResponse(null); - } catch (Exception e) { - logger.warn(() -> new ParameterizedMessage("{} Failed to delete some blobs during snapshot delete", snapshotIds), e); - throw e; + + final BlockingQueue> staleFilesToDeleteInBatch = new LinkedBlockingQueue<>(); + final List partition = new ArrayList<>(); + + try { + for (String key : filesToDelete) { + partition.add(key); + if (maxStaleBlobDeleteBatch == partition.size()) { + staleFilesToDeleteInBatch.add(new ArrayList<>(partition)); + partition.clear(); + } } - })); + if (partition.isEmpty() == false) { + staleFilesToDeleteInBatch.add(new ArrayList<>(partition)); + } + + final GroupedActionListener groupedListener = new GroupedActionListener<>( + ActionListener.wrap(r -> { listener.onResponse(null); }, listener::onFailure), + staleFilesToDeleteInBatch.size() + ); + + // Start as many workers as fit into the snapshot pool at once at the most + final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), filesToDelete.size()); + for (int i = 0; i < workers; ++i) { + executeStaleShardDelete(staleFilesToDeleteInBatch, groupedListener); + } + + } catch (Exception e) { + // TODO: We shouldn't be blanket catching and suppressing all exceptions here and instead handle them safely upstream. + // Currently this catch exists as a stop gap solution to tackle unexpected runtime exceptions from implementations + // bubbling up and breaking the snapshot functionality. + assert false : e; + logger.warn(new ParameterizedMessage("[{}] Exception during cleanup of stale blobs", snapshotIds), e); + } + } + + private void executeStaleShardDelete(BlockingQueue> staleFilesToDeleteInBatch, GroupedActionListener listener) + throws InterruptedException { + List filesToDelete = staleFilesToDeleteInBatch.poll(0L, TimeUnit.MILLISECONDS); + if (filesToDelete != null) { + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> { + try { + deleteFromContainer(blobContainer(), filesToDelete); + l.onResponse(null); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("{} Failed to delete blobs during snapshot delete", metadata.name()), e); + throw e; + } + })); + + executeStaleShardDelete(staleFilesToDeleteInBatch, listener); + } } // updates the shard state metadata for shards of a snapshot that is to be deleted. Also computes the files to be cleaned up.