diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/RepositoriesIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/RepositoriesIT.java index c726efe3d48e9..39fdf77d1baea 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/RepositoriesIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/RepositoriesIT.java @@ -43,8 +43,10 @@ import org.opensearch.common.io.FileSystemUtils; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.ByteSizeUnit; +import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.RepositoryException; import org.opensearch.repositories.RepositoryVerificationException; +import org.opensearch.snapshots.mockstore.MockRepository; import org.opensearch.test.OpenSearchIntegTestCase; import java.nio.file.Path; @@ -124,6 +126,63 @@ public void testRepositoryCreation() throws Exception { assertThat(repositoriesResponse.repositories().size(), equalTo(0)); } + public void testResidualStaleIndicesAreDeletedByConsecutiveDelete() throws Exception { + Client client = client(); + Path repositoryPath = randomRepoPath(); + final String repositoryName = "test-repo"; + final String snapshot1Name = "test-snap-1"; + final String snapshot2Name = "test-snap-2"; + + logger.info("--> creating repository at {}", repositoryPath.toAbsolutePath()); + createRepository(repositoryName, "mock", repositoryPath); + + int numberOfFiles = numberOfFiles(repositoryPath); + + logger.info("--> creating index-1 and ingest data"); + createIndex("test-idx-1"); + ensureGreen(); + for (int j = 0; j < 10; j++) { + index("test-idx-1", "_doc", Integer.toString( 10 + j), "foo", "bar" + 10 + j); + } + refresh(); + + logger.info("--> creating first snapshot"); + createFullSnapshot(repositoryName, snapshot1Name); + + logger.info("--> creating index-2 and ingest data"); + createIndex("test-idx-2"); + ensureGreen(); + for (int j = 0; j < 10; j++) { + index("test-idx-2", "_doc", Integer.toString( 10 + j), "foo", "bar" + 10 + j); + } + refresh(); + + logger.info("--> creating second snapshot"); + createFullSnapshot(repositoryName, snapshot2Name); + + // Make repository to throw exception when trying to delete stale indices + // This will make sure stale indices stays in repository after snapshot delete + String masterNode = internalCluster().getMasterName(); + ((MockRepository)internalCluster().getInstance(RepositoriesService.class, masterNode).repository("test-repo")). + setThrowExceptionWhileDelete(true); + + logger.info("--> delete the second snapshot"); + client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot2Name).get(); + + // Make repository to work normally + ((MockRepository)internalCluster().getInstance(RepositoriesService.class, masterNode).repository("test-repo")). + setThrowExceptionWhileDelete(false); + + // This snapshot should delete last snapshot's residual stale indices as well + logger.info("--> delete snapshot one"); + client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot1Name).get(); + + logger.info("--> make sure that number of files is back to what it was when the first snapshot was made"); + assertFileCount(repositoryPath, numberOfFiles + 2); + + logger.info("--> done"); + } + private RepositoryMetadata findRepository(List repositories, String name) { for (RepositoryMetadata repository : repositories) { if (repository.name().equals(name)) { diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 1fa409a7a3011..18162b103eb96 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -945,7 +945,7 @@ private void cleanupStaleBlobs(Collection deletedSnapshots, Map cleanupStaleIndices(foundIndices, survivingIndexIds))); + cleanupStaleIndices(foundIndices, survivingIndexIds, groupedListener); } } @@ -1053,23 +1053,30 @@ private List cleanupStaleRootFiles(long previousGeneration, Collection foundIndices, Set survivingIndexIds) { - DeleteResult deleteResult = DeleteResult.ZERO; + private void cleanupStaleIndices(Map foundIndices, Set survivingIndexIds, + GroupedActionListener listener) { + final GroupedActionListener groupedListener = new GroupedActionListener<>(ActionListener.wrap(deleteResults -> { + DeleteResult deleteResult = DeleteResult.ZERO; + for (DeleteResult result : deleteResults) { + deleteResult = deleteResult.add(result); + } + listener.onResponse(deleteResult); + }, listener::onFailure), foundIndices.size() - survivingIndexIds.size()); + try { + final BlockingQueue> staleIndicesToDelete = new LinkedBlockingQueue<>(); for (Map.Entry indexEntry : foundIndices.entrySet()) { - final String indexSnId = indexEntry.getKey(); - try { - if (survivingIndexIds.contains(indexSnId) == false) { - logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId); - deleteResult = deleteResult.add(indexEntry.getValue().delete()); - logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId); - } - } catch (IOException e) { - logger.warn(() -> new ParameterizedMessage( - "[{}] index {} is no longer part of any snapshots in the repository, " + - "but failed to clean up their index folders", metadata.name(), indexSnId), e); + if (survivingIndexIds.contains(indexEntry.getKey()) == false) { + staleIndicesToDelete.put(indexEntry); } } + + // Start as many workers as fit into the snapshot pool at once at the most + final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), + foundIndices.size() - survivingIndexIds.size()); + for (int i = 0; i < workers; ++i) { + executeOneStaleIndexDelete(staleIndicesToDelete, groupedListener); + } } catch (Exception e) { // TODO: We shouldn't be blanket catching and suppressing all exceptions here and instead handle them safely upstream. // Currently this catch exists as a stop gap solution to tackle unexpected runtime exceptions from implementations @@ -1077,7 +1084,33 @@ private DeleteResult cleanupStaleIndices(Map foundIndices assert false : e; logger.warn(new ParameterizedMessage("[{}] Exception during cleanup of stale indices", metadata.name()), e); } - return deleteResult; + } + + private void executeOneStaleIndexDelete(BlockingQueue> staleIndicesToDelete, + GroupedActionListener listener) throws InterruptedException { + Map.Entry indexEntry = staleIndicesToDelete.poll(0L, TimeUnit.MILLISECONDS); + if (indexEntry == null) { + return; + } else { + final String indexSnId = indexEntry.getKey(); + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(listener, () -> { + try { + DeleteResult staleIndexDeleteResult = indexEntry.getValue().delete(); + logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId); + executeOneStaleIndexDelete(staleIndicesToDelete, listener); + return staleIndexDeleteResult; + } catch (IOException e) { + logger.warn(() -> new ParameterizedMessage( + "[{}] index {} is no longer part of any snapshots in the repository, " + + "but failed to clean up their index folders", metadata.name(), indexSnId), e); + return DeleteResult.ZERO; + } catch (Exception e) { + assert false : e; + logger.warn(new ParameterizedMessage("[{}] Exception during single stale index delete", metadata.name()), e); + return DeleteResult.ZERO; + } + })); + } } @Override diff --git a/test/framework/src/main/java/org/opensearch/snapshots/mockstore/MockRepository.java b/test/framework/src/main/java/org/opensearch/snapshots/mockstore/MockRepository.java index 06280456831df..89442cef7d86a 100644 --- a/test/framework/src/main/java/org/opensearch/snapshots/mockstore/MockRepository.java +++ b/test/framework/src/main/java/org/opensearch/snapshots/mockstore/MockRepository.java @@ -154,6 +154,7 @@ public long getFailureCount() { private volatile boolean throwReadErrorAfterUnblock = false; private volatile boolean blocked = false; + private volatile boolean setThrowExceptionWhileDelete; public MockRepository(RepositoryMetadata metadata, Environment environment, NamedXContentRegistry namedXContentRegistry, ClusterService clusterService, @@ -257,6 +258,10 @@ public void setFailReadsAfterUnblock(boolean failReadsAfterUnblock) { this.failReadsAfterUnblock = failReadsAfterUnblock; } + public void setThrowExceptionWhileDelete(boolean throwError) { + setThrowExceptionWhileDelete = throwError; + } + public boolean blocked() { return blocked; } @@ -425,6 +430,9 @@ public InputStream readBlob(String name, long position, long length) throws IOEx @Override public DeleteResult delete() throws IOException { DeleteResult deleteResult = DeleteResult.ZERO; + if (setThrowExceptionWhileDelete) { + throw new IOException("Random exception"); + } for (BlobContainer child : children().values()) { deleteResult = deleteResult.add(child.delete()); }