Skip to content

Commit

Permalink
Make snapshot deletes not block the repository during data blob delet…
Browse files Browse the repository at this point in the history
…es (#86514)

Snapshot deletes need not block other operations beyond the updates to the repository metadata at the beginning of the delete operation. All subsequent blob deletes can safely run async and concurrent to other operations.

This is the simplest possible implementation of this change that I could find. It's not the most optimal since concurrent deletes are not guarded against trying to delete the same blobs twice. I believe this is not a big issue in practice though.
For one, we batch overlapping deletes into single operations, so we will only try to redundantly delete blobs leaked by previous operations that are part of indices still referenced (which will generally by a very limited number of blobs I believe) and indices that went out of scope. Indices that went out of scope are deleted by listing out blobs and deleting them in turn, which means that we likely won't be attempting all that many redundant deletes even if the same index would be touched by concurrent delete operations and even if we did, the additional memory use would be bounded.
  • Loading branch information
original-brownbear authored Jun 16, 2022
1 parent b60ccc4 commit 55acdfa
Show file tree
Hide file tree
Showing 14 changed files with 203 additions and 83 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/86514.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 86514
summary: Make snapshot deletes not block the repository during data blob deletes
area: Snapshot/Restore
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository;
import org.elasticsearch.snapshots.SnapshotDeleteListener;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.threadpool.Scheduler;
Expand Down Expand Up @@ -275,12 +276,42 @@ public void deleteSnapshots(
Collection<SnapshotId> snapshotIds,
long repositoryStateId,
Version repositoryMetaVersion,
ActionListener<RepositoryData> listener
SnapshotDeleteListener listener
) {
if (SnapshotsService.useShardGenerations(repositoryMetaVersion) == false) {
listener = delayedListener(listener);
final SnapshotDeleteListener wrappedListener;
if (SnapshotsService.useShardGenerations(repositoryMetaVersion)) {
wrappedListener = listener;
} else {
wrappedListener = new SnapshotDeleteListener() {
@Override
public void onDone() {
listener.onDone();
}

@Override
public void onRepositoryDataWritten(RepositoryData repositoryData) {
logCooldownInfo();
final Scheduler.Cancellable existing = finalizationFuture.getAndSet(threadPool.schedule(() -> {
final Scheduler.Cancellable cancellable = finalizationFuture.getAndSet(null);
assert cancellable != null;
listener.onRepositoryDataWritten(repositoryData);
}, coolDown, ThreadPool.Names.SNAPSHOT));
assert existing == null : "Already have an ongoing finalization " + finalizationFuture;
}

@Override
public void onFailure(Exception e) {
logCooldownInfo();
final Scheduler.Cancellable existing = finalizationFuture.getAndSet(threadPool.schedule(() -> {
final Scheduler.Cancellable cancellable = finalizationFuture.getAndSet(null);
assert cancellable != null;
listener.onFailure(e);
}, coolDown, ThreadPool.Names.SNAPSHOT));
assert existing == null : "Already have an ongoing finalization " + finalizationFuture;
}
};
}
super.deleteSnapshots(snapshotIds, repositoryStateId, repositoryMetaVersion, listener);
super.deleteSnapshots(snapshotIds, repositoryStateId, repositoryMetaVersion, wrappedListener);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,10 @@
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.RepositoryConflictException;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryVerificationException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.snapshots.mockstore.MockRepository;
import org.elasticsearch.test.ESIntegTestCase;

import java.nio.file.Path;
Expand Down Expand Up @@ -270,7 +268,7 @@ public void testRepositoryConflict() throws Exception {
final String snapshot1 = "test-snap1";
client().admin().cluster().prepareCreateSnapshot(repo, snapshot1).setWaitForCompletion(true).get();
String blockedNode = internalCluster().getMasterName();
((MockRepository) internalCluster().getInstance(RepositoriesService.class, blockedNode).repository(repo)).blockOnDataFiles();
blockMasterOnWriteIndexFile(repo);
logger.info("--> start deletion of snapshot");
ActionFuture<AcknowledgedResponse> future = client().admin().cluster().prepareDeleteSnapshot(repo, snapshot1).execute();
logger.info("--> waiting for block to kick in on node [{}]", blockedNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -802,12 +802,28 @@ private void startCleaner() {
.cluster()
.prepareCleanupRepository(trackedRepository.repositoryName)
.execute(mustSucceed(cleanupRepositoryResponse -> {
Releasables.close(releaseAll);
logger.info("--> completed cleanup of [{}]", trackedRepository.repositoryName);
final RepositoryCleanupResult result = cleanupRepositoryResponse.result();
assertThat(Strings.toString(result), result.blobs(), equalTo(0L));
assertThat(Strings.toString(result), result.bytes(), equalTo(0L));
startCleaner();
if (result.bytes() > 0L || result.blobs() > 0L) {
// we could legitimately run into dangling blobs as the result of a shard snapshot failing half-way
// through the snapshot because of a concurrent index-close or -delete. The second round of cleanup on
// the same repository however must always fully remove any dangling blobs since we block all concurrent
// operations on the repository here
client.admin()
.cluster()
.prepareCleanupRepository(trackedRepository.repositoryName)
.execute(mustSucceed(secondCleanupRepositoryResponse -> {
final RepositoryCleanupResult secondCleanupResult = secondCleanupRepositoryResponse.result();
assertThat(Strings.toString(secondCleanupResult), secondCleanupResult.blobs(), equalTo(0L));
assertThat(Strings.toString(secondCleanupResult), secondCleanupResult.bytes(), equalTo(0L));
Releasables.close(releaseAll);
logger.info("--> completed second cleanup of [{}]", trackedRepository.repositoryName);
startCleaner();
}));
} else {
Releasables.close(releaseAll);
logger.info("--> completed cleanup of [{}]", trackedRepository.repositoryName);
startCleaner();
}
}));

startedCleanup = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.snapshots.SnapshotDeleteListener;
import org.elasticsearch.snapshots.SnapshotId;

import java.io.IOException;
Expand Down Expand Up @@ -77,7 +78,7 @@ public void deleteSnapshots(
Collection<SnapshotId> snapshotIds,
long repositoryStateId,
Version repositoryMetaVersion,
ActionListener<RepositoryData> listener
SnapshotDeleteListener listener
) {
in.deleteSnapshots(snapshotIds, repositoryStateId, repositoryMetaVersion, listener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.snapshots.SnapshotDeleteListener;
import org.elasticsearch.snapshots.SnapshotId;

import java.io.IOException;
Expand Down Expand Up @@ -84,7 +85,7 @@ public void deleteSnapshots(
Collection<SnapshotId> snapshotIds,
long repositoryStateId,
Version repositoryMetaVersion,
ActionListener<RepositoryData> listener
SnapshotDeleteListener listener
) {
listener.onFailure(createCreationException());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.snapshots.SnapshotDeleteListener;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -149,7 +150,7 @@ void deleteSnapshots(
Collection<SnapshotId> snapshotIds,
long repositoryStateId,
Version repositoryMetaVersion,
ActionListener<RepositoryData> listener
SnapshotDeleteListener listener
);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.snapshots.SnapshotDeleteListener;
import org.elasticsearch.snapshots.SnapshotId;

import java.io.IOException;
Expand Down Expand Up @@ -82,7 +83,7 @@ public void deleteSnapshots(
Collection<SnapshotId> snapshotIds,
long repositoryStateId,
Version repositoryMetaVersion,
ActionListener<RepositoryData> listener
SnapshotDeleteListener listener
) {
listener.onFailure(createUnknownTypeException());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
import org.elasticsearch.repositories.ShardSnapshotResult;
import org.elasticsearch.repositories.SnapshotShardContext;
import org.elasticsearch.snapshots.AbortedSnapshotException;
import org.elasticsearch.snapshots.SnapshotDeleteListener;
import org.elasticsearch.snapshots.SnapshotException;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
Expand Down Expand Up @@ -807,7 +808,7 @@ public void deleteSnapshots(
Collection<SnapshotId> snapshotIds,
long repositoryStateId,
Version repositoryMetaVersion,
ActionListener<RepositoryData> listener
SnapshotDeleteListener listener
) {
if (isReadOnly()) {
listener.onFailure(new RepositoryException(metadata.name(), "cannot delete snapshot from a readonly repository"));
Expand Down Expand Up @@ -906,9 +907,8 @@ private void doDeleteShardSnapshots(
Map<String, BlobMetadata> rootBlobs,
RepositoryData repositoryData,
Version repoMetaVersion,
ActionListener<RepositoryData> listener
SnapshotDeleteListener listener
) {

if (SnapshotsService.useShardGenerations(repoMetaVersion)) {
// First write the new shard state metadata (with the removed snapshot) and compute deletion targets
final StepListener<Collection<ShardSnapshotMetaDeleteResult>> writeShardMetaDataAndComputeDeletesStep = new StepListener<>();
Expand Down Expand Up @@ -937,11 +937,9 @@ private void doDeleteShardSnapshots(
}, listener::onFailure);
// Once we have updated the repository, run the clean-ups
writeUpdatedRepoDataStep.whenComplete(updatedRepoData -> {
listener.onRepositoryDataWritten(updatedRepoData);
// Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
final ActionListener<Void> afterCleanupsListener = new GroupedActionListener<>(
ActionListener.wrap(() -> listener.onResponse(updatedRepoData)),
2
);
final ActionListener<Void> afterCleanupsListener = new GroupedActionListener<>(ActionListener.wrap(listener::onDone), 2);
cleanupUnlinkedRootAndIndicesBlobs(snapshotIds, foundIndices, rootBlobs, updatedRepoData, afterCleanupsListener);
asyncCleanupUnlinkedShardLevelBlobs(
repositoryData,
Expand All @@ -955,10 +953,10 @@ private void doDeleteShardSnapshots(
final RepositoryData updatedRepoData = repositoryData.removeSnapshots(snapshotIds, ShardGenerations.EMPTY);
writeIndexGen(updatedRepoData, repositoryStateId, repoMetaVersion, Function.identity(), ActionListener.wrap(newRepoData -> {
// Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
final ActionListener<Void> afterCleanupsListener = new GroupedActionListener<>(
ActionListener.wrap(() -> listener.onResponse(newRepoData)),
2
);
final ActionListener<Void> afterCleanupsListener = new GroupedActionListener<>(ActionListener.wrap(() -> {
listener.onRepositoryDataWritten(newRepoData);
listener.onDone();
}), 2);
cleanupUnlinkedRootAndIndicesBlobs(snapshotIds, foundIndices, rootBlobs, newRepoData, afterCleanupsListener);
final StepListener<Collection<ShardSnapshotMetaDeleteResult>> writeMetaAndComputeDeletesStep = new StepListener<>();
writeUpdatedShardMetaDataAndComputeDeletes(snapshotIds, repositoryData, false, writeMetaAndComputeDeletesStep);
Expand Down Expand Up @@ -2627,8 +2625,8 @@ public void snapshotShard(SnapshotShardContext context) {
final long startTime = threadPool.absoluteTimeInMillis();
try {
final ShardGeneration generation = snapshotStatus.generation();
logger.debug("[{}] [{}] snapshot to [{}] [{}] ...", shardId, snapshotId, metadata.name(), generation);
final BlobContainer shardContainer = shardContainer(context.indexId(), shardId);
logger.debug("[{}][{}] snapshot to [{}][{}][{}] ...", shardId, snapshotId, metadata.name(), context.indexId(), generation);
final Set<String> blobs;
if (generation == null) {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.snapshots;

import org.elasticsearch.repositories.RepositoryData;

public interface SnapshotDeleteListener {

/**
* Invoked once a snapshot has been fully deleted from the repository.
*/
void onDone();

/**
* Invoked once the updated {@link RepositoryData} has been written to the repository.
*
* @param repositoryData updated repository data
*/
void onRepositoryDataWritten(RepositoryData repositoryData);

/**
* Invoked if writing updated {@link RepositoryData} to the repository failed. Once {@link #onRepositoryDataWritten(RepositoryData)} has
* been invoked this method will never be invoked.
*
* @param e exception during metadata steps of snapshot delete
*/
void onFailure(Exception e);
}
Loading

0 comments on commit 55acdfa

Please sign in to comment.