Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make snapshot deletion faster #2

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -916,23 +916,25 @@ private void cleanupStaleBlobs(Collection<SnapshotId> deletedSnapshots, Map<Stri
listener.onResponse(deleteResult);
}, listener::onFailure), 2);

final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
final List<String> staleRootBlobs = staleRootBlobs(newRepoData, rootBlobs.keySet());
if (staleRootBlobs.isEmpty()) {
groupedListener.onResponse(DeleteResult.ZERO);
} else {
executor.execute(ActionRunnable.supply(groupedListener, () -> {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(groupedListener, () -> {
List<String> deletedBlobs =
cleanupStaleRootFiles(newRepoData.getGenId() - 1, deletedSnapshots, staleRootBlobs);
return new DeleteResult(deletedBlobs.size(), deletedBlobs.stream().mapToLong(name -> rootBlobs.get(name).length()).sum());
}));
}

final Set<String> survivingIndexIds = newRepoData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toSet());
if (foundIndices.keySet().equals(survivingIndexIds)) {
BlockingQueue<Map.Entry<String, BlobContainer>> staleIndicesToDelete = foundIndices.entrySet().stream()
.filter(foundIndexEntry -> !survivingIndexIds.contains(foundIndexEntry.getKey()))
.collect(Collectors.toCollection(LinkedBlockingQueue::new));
if (staleIndicesToDelete.isEmpty()) {
groupedListener.onResponse(DeleteResult.ZERO);
} else {
executor.execute(ActionRunnable.supply(groupedListener, () -> cleanupStaleIndices(foundIndices, survivingIndexIds)));
cleanupStaleIndices(groupedListener, staleIndicesToDelete);
}
}

Expand Down Expand Up @@ -1040,31 +1042,57 @@ private List<String> cleanupStaleRootFiles(long previousGeneration, Collection<S
return Collections.emptyList();
}

private DeleteResult cleanupStaleIndices(Map<String, BlobContainer> foundIndices, Set<String> survivingIndexIds) {
DeleteResult deleteResult = DeleteResult.ZERO;
private void cleanupStaleIndices(GroupedActionListener<DeleteResult> listener,
BlockingQueue<Map.Entry<String, BlobContainer>> staleIndicesToDelete) {
final GroupedActionListener<DeleteResult> groupedListener = new GroupedActionListener<>(ActionListener.wrap(deleteResults -> {
DeleteResult deleteResult = DeleteResult.ZERO;
for (DeleteResult result : deleteResults) {
deleteResult = deleteResult.add(result);
}
listener.onResponse(deleteResult);
}, listener::onFailure), staleIndicesToDelete.size());

try {
for (Map.Entry<String, BlobContainer> indexEntry : foundIndices.entrySet()) {
final String indexSnId = indexEntry.getKey();
try {
if (survivingIndexIds.contains(indexSnId) == false) {
logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId);
deleteResult = deleteResult.add(indexEntry.getValue().delete());
logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId);
}
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage(
"[{}] index {} is no longer part of any snapshots in the repository, " +
"but failed to clean up their index folders", metadata.name(), indexSnId), e);
}
final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), staleIndicesToDelete.size());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible for staleIndicesToDelete to exceed the max threadPool size?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is very much possible as the max threads in this threadpool is 5 and number of stale indices can be easily in the dozens if not more. This is defined in the ThreadPool class constructor, the max is: org.elasticsearch.threadpool.ThreadPool#halfAllocatedProcessorsMaxFive.
The reason we take the min of the two is in case there are indeed less than 5 deletions required.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great! Thank you for the explanation!

for (int i = 0; i < workers; ++i) {
executeOneStaleIndexDelete(groupedListener, staleIndicesToDelete);
}
} catch (Exception e) {
// TODO: We shouldn't be blanket catching and suppressing all exceptions here and instead handle them safely upstream.
// Currently this catch exists as a stop gap solution to tackle unexpected runtime exceptions from implementations
// bubbling up and breaking the snapshot functionality.
assert false : e;
logger.warn(new ParameterizedMessage("[{}] Exception during cleanup of stale indices", metadata.name()), e);
listener.onResponse(DeleteResult.ZERO);
}
}

private void executeOneStaleIndexDelete(GroupedActionListener<DeleteResult> listener,
BlockingQueue<Map.Entry<String, BlobContainer>> staleIndicesToDelete)
throws InterruptedException {
Map.Entry<String, BlobContainer> indexEntry = staleIndicesToDelete.poll(0L, TimeUnit.MILLISECONDS);
if (indexEntry != null) {
final String indexSnId = indexEntry.getKey();
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(listener, () -> {
DeleteResult deleteResult = DeleteResult.ZERO;
try {
logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId);
deleteResult = indexEntry.getValue().delete();
logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId);
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage(
"[{}] index {} is no longer part of any snapshots in the repository, " +
"but failed to clean up their index folders", metadata.name(), indexSnId), e);
} catch (Exception e) {
assert false : e;
logger.warn(() ->
new ParameterizedMessage("[{}] Exception during cleanup of stale index [{}]", metadata.name(), indexSnId), e);
}

executeOneStaleIndexDelete(listener, staleIndicesToDelete);
return deleteResult;
}));
}
return deleteResult;
}

@Override
Expand Down