From a48249084d032d46f4a7e8af64b2a863eae4cabc Mon Sep 17 00:00:00 2001 From: AmiStrn Date: Thu, 18 Feb 2021 17:03:33 +0200 Subject: [PATCH] Make snapshot deletion faster The delete snapshot task takes longer than expected. A major reason for this is that the (often many) stale indices are deleted iteratively. In this commit we change the deletion to be concurrent using the SNAPSHOT threadpool. Notice that in order to avoid putting too many delete tasks on the threadpool queue a similar methodology was used as in `executeOneFileSnapshot()`. This is due to the fact that the threadpool should allow other tasks to use this threadpool without too much of a delay. fixes issue #61513 from Elasticsearch project --- .../blobstore/BlobStoreRepository.java | 61 +++++++++++++------ 1 file changed, 43 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 9e1f4272ee373..b82da1818e768 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -929,10 +929,13 @@ private void cleanupStaleBlobs(Collection deletedSnapshots, Map survivingIndexIds = newRepoData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toSet()); - if (foundIndices.keySet().equals(survivingIndexIds)) { + BlockingQueue> staleIndicesToDelete = foundIndices.entrySet().stream() + .filter(foundIndexEntry -> !survivingIndexIds.contains(foundIndexEntry.getKey())) + .collect(Collectors.toCollection(LinkedBlockingQueue::new)); + if (staleIndicesToDelete.isEmpty()) { groupedListener.onResponse(DeleteResult.ZERO); } else { - executor.execute(ActionRunnable.supply(groupedListener, () -> cleanupStaleIndices(foundIndices, survivingIndexIds))); + cleanupStaleIndices(groupedListener, staleIndicesToDelete); } } @@ -1040,22 +1043,20 @@ private List cleanupStaleRootFiles(long previousGeneration, Collection foundIndices, Set survivingIndexIds) { - DeleteResult deleteResult = DeleteResult.ZERO; + private void cleanupStaleIndices(GroupedActionListener listener, BlockingQueue> indicesToDelete) { + final GroupedActionListener groupedListener = new GroupedActionListener<>(ActionListener.wrap(deleteResults -> { + DeleteResult deleteResult = DeleteResult.ZERO; + for (DeleteResult result : deleteResults) { + deleteResult = deleteResult.add(result); + } + listener.onResponse(deleteResult); + }, listener::onFailure), indicesToDelete.size()); + try { - for (Map.Entry indexEntry : foundIndices.entrySet()) { - final String indexSnId = indexEntry.getKey(); - try { - if (survivingIndexIds.contains(indexSnId) == false) { - logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId); - deleteResult = deleteResult.add(indexEntry.getValue().delete()); - logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId); - } - } catch (IOException e) { - logger.warn(() -> new ParameterizedMessage( - "[{}] index {} is no longer part of any snapshots in the repository, " + - "but failed to clean up their index folders", metadata.name(), indexSnId), e); - } + // Start as many workers as fit into the snapshot pool at once at the most + final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), indicesToDelete.size()); + for (int i = 0; i < workers; ++i) { + executeOneStaleIndexDelete(groupedListener, indicesToDelete); } } catch (Exception e) { // TODO: We shouldn't be blanket catching and suppressing all exceptions here and instead handle them safely upstream. @@ -1064,7 +1065,31 @@ private DeleteResult cleanupStaleIndices(Map foundIndices assert false : e; logger.warn(new ParameterizedMessage("[{}] Exception during cleanup of stale indices", metadata.name()), e); } - return deleteResult; + } + + private void executeOneStaleIndexDelete(GroupedActionListener listener, BlockingQueue> indicesToDelete) throws InterruptedException { + final Map.Entry staleIndex = indicesToDelete.poll(0L, TimeUnit.MILLISECONDS); + if (staleIndex != null) { + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(listener, () -> { + String indexSnId = staleIndex.getKey(); + try { + logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId); + DeleteResult deleteResult = staleIndex.getValue().delete(); + logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId); + executeOneStaleIndexDelete(listener, indicesToDelete); + return deleteResult; + } catch (IOException e) { + logger.warn(() -> new ParameterizedMessage( + "[{}] index {} is no longer part of any snapshots in the repository, " + + "but failed to clean up their index folders", metadata.name(), indexSnId), e); + return DeleteResult.ZERO; + } catch (Exception e) { + assert false : e; + logger.warn(new ParameterizedMessage("[{}] Exception during cleanup of stale index [{}]", metadata.name(), indexSnId), e); + return DeleteResult.ZERO; + } + })); + } } @Override