Skip to content

Commit

Permalink
Make snapshot deletion faster
Browse files Browse the repository at this point in the history
The delete snapshot task takes longer than expected. A major reason for this is
that the (often many) stale indices are deleted iteratively.
In this commit we change the deletion to be concurrent using the SNAPSHOT threadpool.
Notice that in order to avoid putting too many delete tasks on the threadpool
queue a similar methodology was used as in `executeOneFileSnapshot()`. This is due to
 the fact that the threadpool should allow other tasks to use this threadpool without
too much of a delay.

fixes issue elastic#61513 from Elasticsearch project
  • Loading branch information
AmiStrn committed Feb 18, 2021
1 parent 747e1cc commit a482490
Showing 1 changed file with 43 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -929,10 +929,13 @@ private void cleanupStaleBlobs(Collection<SnapshotId> deletedSnapshots, Map<Stri
}

final Set<String> survivingIndexIds = newRepoData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toSet());
if (foundIndices.keySet().equals(survivingIndexIds)) {
BlockingQueue<Map.Entry<String, BlobContainer>> staleIndicesToDelete = foundIndices.entrySet().stream()
.filter(foundIndexEntry -> !survivingIndexIds.contains(foundIndexEntry.getKey()))
.collect(Collectors.toCollection(LinkedBlockingQueue::new));
if (staleIndicesToDelete.isEmpty()) {
groupedListener.onResponse(DeleteResult.ZERO);
} else {
executor.execute(ActionRunnable.supply(groupedListener, () -> cleanupStaleIndices(foundIndices, survivingIndexIds)));
cleanupStaleIndices(groupedListener, staleIndicesToDelete);
}
}

Expand Down Expand Up @@ -1040,22 +1043,20 @@ private List<String> cleanupStaleRootFiles(long previousGeneration, Collection<S
return Collections.emptyList();
}

private DeleteResult cleanupStaleIndices(Map<String, BlobContainer> foundIndices, Set<String> survivingIndexIds) {
DeleteResult deleteResult = DeleteResult.ZERO;
private void cleanupStaleIndices(GroupedActionListener<DeleteResult> listener, BlockingQueue<Map.Entry<String, BlobContainer>> indicesToDelete) {
final GroupedActionListener<DeleteResult> groupedListener = new GroupedActionListener<>(ActionListener.wrap(deleteResults -> {
DeleteResult deleteResult = DeleteResult.ZERO;
for (DeleteResult result : deleteResults) {
deleteResult = deleteResult.add(result);
}
listener.onResponse(deleteResult);
}, listener::onFailure), indicesToDelete.size());

try {
for (Map.Entry<String, BlobContainer> indexEntry : foundIndices.entrySet()) {
final String indexSnId = indexEntry.getKey();
try {
if (survivingIndexIds.contains(indexSnId) == false) {
logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId);
deleteResult = deleteResult.add(indexEntry.getValue().delete());
logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId);
}
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage(
"[{}] index {} is no longer part of any snapshots in the repository, " +
"but failed to clean up their index folders", metadata.name(), indexSnId), e);
}
// Start as many workers as fit into the snapshot pool at once at the most
final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), indicesToDelete.size());
for (int i = 0; i < workers; ++i) {
executeOneStaleIndexDelete(groupedListener, indicesToDelete);
}
} catch (Exception e) {
// TODO: We shouldn't be blanket catching and suppressing all exceptions here and instead handle them safely upstream.
Expand All @@ -1064,7 +1065,31 @@ private DeleteResult cleanupStaleIndices(Map<String, BlobContainer> foundIndices
assert false : e;
logger.warn(new ParameterizedMessage("[{}] Exception during cleanup of stale indices", metadata.name()), e);
}
return deleteResult;
}

private void executeOneStaleIndexDelete(GroupedActionListener<DeleteResult> listener, BlockingQueue<Map.Entry<String, BlobContainer>> indicesToDelete) throws InterruptedException {
final Map.Entry<String, BlobContainer> staleIndex = indicesToDelete.poll(0L, TimeUnit.MILLISECONDS);
if (staleIndex != null) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(listener, () -> {
String indexSnId = staleIndex.getKey();
try {
logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId);
DeleteResult deleteResult = staleIndex.getValue().delete();
logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId);
executeOneStaleIndexDelete(listener, indicesToDelete);
return deleteResult;
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage(
"[{}] index {} is no longer part of any snapshots in the repository, " +
"but failed to clean up their index folders", metadata.name(), indexSnId), e);
return DeleteResult.ZERO;
} catch (Exception e) {
assert false : e;
logger.warn(new ParameterizedMessage("[{}] Exception during cleanup of stale index [{}]", metadata.name(), indexSnId), e);
return DeleteResult.ZERO;
}
}));
}
}

@Override
Expand Down

0 comments on commit a482490

Please sign in to comment.