Skip to content

Commit

Permalink
Parallelize stale blobs deletion during snapshot delete
Browse files Browse the repository at this point in the history
Signed-off-by: Piyush Daftary <[email protected]>
  • Loading branch information
piyushdaftary committed Feb 17, 2022
1 parent b9ff91d commit 9a48605
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
import org.opensearch.action.admin.cluster.repositories.verify.VerifyRepositoryResponse;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.Metadata;
Expand All @@ -46,6 +48,7 @@
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.RepositoryException;
import org.opensearch.repositories.RepositoryVerificationException;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.snapshots.mockstore.MockRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -198,6 +201,57 @@ public void testResidualStaleIndicesAreDeletedByConsecutiveDelete() throws Excep
logger.info("--> done");
}

public void testSnapshotShardBlobDelete() throws Exception {
Client client = client();
Path repositoryPath = randomRepoPath();
final String repositoryName = "test-repo";
final String firstSnapshot = "first-snapshot";
final String secondSnapshot = "second-snapshot";
final String indexName = "test-idx";

logger.info("--> creating repository at {}", repositoryPath.toAbsolutePath());
createRepository(
"test-repo",
"mock",
Settings.builder().put("location", repositoryPath).put(BlobStoreRepository.MAX_STALE_BLOB_DELETE_BATCH_SIZE.getKey(), 10)
);

logger.info("--> creating index-0 and ingest data");
createIndex(indexName);
ensureGreen();
for (int j = 0; j < 10; j++) {
index(indexName, "_doc", Integer.toString(10 + j), "foo", "bar" + 10 + j);
}
refresh();

logger.info("--> creating first snapshot");
createFullSnapshot(repositoryName, firstSnapshot);

int numberOfFiles = numberOfFiles(repositoryPath);

logger.info("--> adding some more documents to test index");
for (int j = 0; j < 10; ++j) {
final BulkRequest bulkRequest = new BulkRequest();
for (int i = 0; i < 100; ++i) {
bulkRequest.add(new IndexRequest(indexName).source("foo" + j, "bar" + i));
}
client().bulk(bulkRequest).get();
}
refresh();

logger.info("--> creating second snapshot");
createFullSnapshot(repositoryName, secondSnapshot);

// Delete second snapshot
logger.info("--> delete second snapshot");
client.admin().cluster().prepareDeleteSnapshot(repositoryName, secondSnapshot).get();

logger.info("--> make sure that number of files is back to what it was when the first snapshot was made");
assertFileCount(repositoryPath, numberOfFiles);

logger.info("--> done");
}

private RepositoryMetadata findRepository(List<RepositoryMetadata> repositories, String name) {
for (RepositoryMetadata repository : repositories) {
if (repository.name().equals(name)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,15 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
Setting.Property.NodeScope
);

/**
* Setting to set batch size of stale blobs to be deleted.
*/
public static final Setting<Integer> MAX_STALE_BLOB_DELETE_BATCH_SIZE = Setting.intSetting(
"max_stale_blob_delete_batch_size",
1000,
Setting.Property.NodeScope
);

/**
* Setting to disable writing the {@code index.latest} blob which enables the contents of this repository to be used with a
* url-repository.
Expand All @@ -241,6 +250,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp

protected final boolean supportURLRepo;

private final int maxStaleBlobDeleteBatch;

private final boolean compress;

private final boolean cacheRepositoryData;
Expand Down Expand Up @@ -356,6 +367,7 @@ protected BlobStoreRepository(
readOnly = metadata.settings().getAsBoolean("readonly", false);
cacheRepositoryData = CACHE_REPOSITORY_DATA.get(metadata.settings());
bufferSize = Math.toIntExact(BUFFER_SIZE_SETTING.get(metadata.settings()).getBytes());
maxStaleBlobDeleteBatch = MAX_STALE_BLOB_DELETE_BATCH_SIZE.get(metadata.settings());
}

@Override
Expand Down Expand Up @@ -900,15 +912,58 @@ private void asyncCleanupUnlinkedShardLevelBlobs(
listener.onResponse(null);
return;
}
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> {
try {
deleteFromContainer(blobContainer(), filesToDelete);
l.onResponse(null);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("{} Failed to delete some blobs during snapshot delete", snapshotIds), e);
throw e;

final BlockingQueue<List<String>> staleFilesToDeleteInBatch = new LinkedBlockingQueue<>();
final List<String> partition = new ArrayList<>();

try {
for (String key : filesToDelete) {
partition.add(key);
if (maxStaleBlobDeleteBatch == partition.size()) {
staleFilesToDeleteInBatch.add(new ArrayList<>(partition));
partition.clear();
}
}
}));
if (partition.isEmpty() == false) {
staleFilesToDeleteInBatch.add(new ArrayList<>(partition));
}

final GroupedActionListener<Void> groupedListener = new GroupedActionListener<>(
ActionListener.wrap(r -> { listener.onResponse(null); }, listener::onFailure),
staleFilesToDeleteInBatch.size()
);

// Start as many workers as fit into the snapshot pool at once at the most
final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), filesToDelete.size());
for (int i = 0; i < workers; ++i) {
executeStaleShardDelete(staleFilesToDeleteInBatch, groupedListener);
}

} catch (Exception e) {
// TODO: We shouldn't be blanket catching and suppressing all exceptions here and instead handle them safely upstream.
// Currently this catch exists as a stop gap solution to tackle unexpected runtime exceptions from implementations
// bubbling up and breaking the snapshot functionality.
assert false : e;
logger.warn(new ParameterizedMessage("[{}] Exception during cleanup of stale blobs", snapshotIds), e);
}
}

private void executeStaleShardDelete(BlockingQueue<List<String>> staleFilesToDeleteInBatch, GroupedActionListener<Void> listener)
throws InterruptedException {
List<String> filesToDelete = staleFilesToDeleteInBatch.poll(0L, TimeUnit.MILLISECONDS);
if (filesToDelete != null) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> {
try {
deleteFromContainer(blobContainer(), filesToDelete);
l.onResponse(null);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("{} Failed to delete blobs during snapshot delete", metadata.name()), e);
throw e;
}
}));

executeStaleShardDelete(staleFilesToDeleteInBatch, listener);
}
}

// updates the shard state metadata for shards of a snapshot that is to be deleted. Also computes the files to be cleaned up.
Expand Down

0 comments on commit 9a48605

Please sign in to comment.