Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize stale blobs deletion during snapshot delete #3796

Merged
merged 5 commits into from
Jul 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
import org.opensearch.action.admin.cluster.repositories.verify.VerifyRepositoryResponse;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.Metadata;
Expand All @@ -46,6 +48,7 @@
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.RepositoryException;
import org.opensearch.repositories.RepositoryVerificationException;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.snapshots.mockstore.MockRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -328,4 +331,125 @@ public void testRepositoryVerification() throws Exception {
assertThat(ex.getMessage(), containsString("is not shared"));
}
}

public void testSnapshotShardBlobDelete() throws Exception {
piyushdaftary marked this conversation as resolved.
Show resolved Hide resolved
Client client = client();
Path repositoryPath = randomRepoPath();
final String repositoryName = "test-repo";
final String firstSnapshot = "first-snapshot";
final String secondSnapshot = "second-snapshot";
final String indexName = "test-idx";

logger.info("--> creating repository at {}", repositoryPath.toAbsolutePath());
int maxShardBlobDeleteBatchSize = randomIntBetween(1, 1000);
createRepository(
"test-repo",
"mock",
Settings.builder()
.put("location", repositoryPath)
.put(BlobStoreRepository.MAX_SNAPSHOT_SHARD_BLOB_DELETE_BATCH_SIZE.getKey(), maxShardBlobDeleteBatchSize)
);

logger.info("--> creating index-0 and ingest data");
createIndex(indexName);
ensureGreen();
for (int j = 0; j < randomIntBetween(1, 1000); j++) {
index(indexName, "_doc", Integer.toString(j), "foo", "bar" + j);
}
refresh();

logger.info("--> creating first snapshot");
createFullSnapshot(repositoryName, firstSnapshot);

int numberOfFiles = numberOfFiles(repositoryPath);

logger.info("--> adding some more documents to test index");
for (int j = 0; j < randomIntBetween(100, 10000); ++j) {
final BulkRequest bulkRequest = new BulkRequest();
for (int i = 0; i < randomIntBetween(100, 1000); ++i) {
bulkRequest.add(new IndexRequest(indexName).source("foo" + j, "bar" + i));
}
client().bulk(bulkRequest).get();
}
refresh();

logger.info("--> creating second snapshot");
createFullSnapshot(repositoryName, secondSnapshot);

// Delete second snapshot
logger.info("--> delete second snapshot");
client.admin().cluster().prepareDeleteSnapshot(repositoryName, secondSnapshot).get();

logger.info("--> make sure that number of files is back to what it was when the first snapshot was made");
assertFileCount(repositoryPath, numberOfFiles);

logger.info("--> done");
}

public void testSnapshotShardBlobDeletionRepositoryThrowingError() throws Exception {
Client client = client();
Path repositoryPath = randomRepoPath();
final String repositoryName = "test-repo";
final String firstSnapshot = "first-snapshot";
final String secondSnapshot = "second-snapshot";
final String indexName = "test-idx";

logger.info("--> creating repository at {}", repositoryPath.toAbsolutePath());
int maxShardBlobDeleteBatchSize = randomIntBetween(1, 1000);
createRepository(
"test-repo",
"mock",
Settings.builder()
.put("location", repositoryPath)
.put(BlobStoreRepository.MAX_SNAPSHOT_SHARD_BLOB_DELETE_BATCH_SIZE.getKey(), maxShardBlobDeleteBatchSize)
);

logger.info("--> creating index-0 and ingest data");
createIndex(indexName);
ensureGreen();
for (int j = 0; j < randomIntBetween(1, 1000); j++) {
index(indexName, "_doc", Integer.toString(j), "foo", "bar" + j);
}
refresh();

logger.info("--> creating first snapshot");
createFullSnapshot(repositoryName, firstSnapshot);

logger.info("--> adding some more documents to test index");
for (int j = 0; j < randomIntBetween(100, 1000); ++j) {
final BulkRequest bulkRequest = new BulkRequest();
for (int i = 0; i < randomIntBetween(100, 1000); ++i) {
bulkRequest.add(new IndexRequest(indexName).source("foo" + j, "bar" + i));
}
client().bulk(bulkRequest).get();
}
refresh();

logger.info("--> creating second snapshot");
createFullSnapshot(repositoryName, secondSnapshot);

// Make repository to throw exception when trying to delete stale snapshot shard blobs
String clusterManagerNode = internalCluster().getMasterName();
((MockRepository) internalCluster().getInstance(RepositoriesService.class, clusterManagerNode).repository("test-repo"))
.setThrowExceptionWhileDelete(true);

// Delete second snapshot
logger.info("--> delete second snapshot");
client.admin().cluster().prepareDeleteSnapshot(repositoryName, secondSnapshot).get();

// Make repository to work normally
((MockRepository) internalCluster().getInstance(RepositoriesService.class, clusterManagerNode).repository("test-repo"))
.setThrowExceptionWhileDelete(false);

// This snapshot should delete last snapshot's residual stale shard blobs as well
logger.info("--> delete first snapshot");
client.admin().cluster().prepareDeleteSnapshot(repositoryName, firstSnapshot).get();

// Expect two files to remain in the repository:
// (1) index-(N+1)
// (2) index-latest
assertFileCount(repositoryPath, 2);

logger.info("--> done");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
Expand Down Expand Up @@ -235,6 +236,17 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
Setting.Property.NodeScope
);

/**
* Setting to set batch size of stale snapshot shard blobs that will be deleted by snapshot workers as part of snapshot deletion.
* For optimal performance the value of the setting should be equal to or close to repository's max # of keys that can be deleted in single operation
* Most cloud storage support upto 1000 key(s) deletion in single operation, thus keeping default value to be 1000.
*/
public static final Setting<Integer> MAX_SNAPSHOT_SHARD_BLOB_DELETE_BATCH_SIZE = Setting.intSetting(
"max_snapshot_shard_blob_delete_batch_size",
1000, // the default maximum batch size of stale snapshot shard blobs deletion
Setting.Property.NodeScope
);

/**
* Setting to disable writing the {@code index.latest} blob which enables the contents of this repository to be used with a
* url-repository.
Expand All @@ -243,6 +255,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp

protected final boolean supportURLRepo;

private final int maxShardBlobDeleteBatch;

private final boolean compress;

private final boolean cacheRepositoryData;
Expand Down Expand Up @@ -358,6 +372,7 @@ protected BlobStoreRepository(
readOnly = metadata.settings().getAsBoolean("readonly", false);
cacheRepositoryData = CACHE_REPOSITORY_DATA.get(metadata.settings());
bufferSize = Math.toIntExact(BUFFER_SIZE_SETTING.get(metadata.settings()).getBytes());
maxShardBlobDeleteBatch = MAX_SNAPSHOT_SHARD_BLOB_DELETE_BATCH_SIZE.get(metadata.settings());
}

@Override
Expand Down Expand Up @@ -902,15 +917,57 @@ private void asyncCleanupUnlinkedShardLevelBlobs(
listener.onResponse(null);
return;
}
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> {
try {
deleteFromContainer(blobContainer(), filesToDelete);
l.onResponse(null);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("{} Failed to delete some blobs during snapshot delete", snapshotIds), e);
throw e;

try {
AtomicInteger counter = new AtomicInteger();
Collection<List<String>> subList = filesToDelete.stream()
.collect(Collectors.groupingBy(it -> counter.getAndIncrement() / maxShardBlobDeleteBatch))
.values();
final BlockingQueue<List<String>> staleFilesToDeleteInBatch = new LinkedBlockingQueue<>(subList);

final GroupedActionListener<Void> groupedListener = new GroupedActionListener<>(
ActionListener.wrap(r -> { listener.onResponse(null); }, listener::onFailure),
staleFilesToDeleteInBatch.size()
);

// Start as many workers as fit into the snapshot pool at once at the most
final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), staleFilesToDeleteInBatch.size());
for (int i = 0; i < workers; ++i) {
executeStaleShardDelete(staleFilesToDeleteInBatch, groupedListener);
}
}));

} catch (Exception e) {
// TODO: We shouldn't be blanket catching and suppressing all exceptions here and instead handle them safely upstream.
// Currently this catch exists as a stop gap solution to tackle unexpected runtime exceptions from implementations
// bubbling up and breaking the snapshot functionality.
assert false : e;
logger.warn(new ParameterizedMessage("[{}] Exception during cleanup of stale shard blobs", snapshotIds), e);
listener.onFailure(e);
}
}

private void executeStaleShardDelete(BlockingQueue<List<String>> staleFilesToDeleteInBatch, GroupedActionListener<Void> listener)
throws InterruptedException {
List<String> filesToDelete = staleFilesToDeleteInBatch.poll(0L, TimeUnit.MILLISECONDS);
if (filesToDelete != null) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> {
try {
deleteFromContainer(blobContainer(), filesToDelete);
l.onResponse(null);
} catch (Exception e) {
logger.warn(
() -> new ParameterizedMessage(
"[{}] Failed to delete following blobs during snapshot delete : {}",
metadata.name(),
filesToDelete
),
e
);
l.onFailure(e);
}
executeStaleShardDelete(staleFilesToDeleteInBatch, listener);
piyushdaftary marked this conversation as resolved.
Show resolved Hide resolved
piyushdaftary marked this conversation as resolved.
Show resolved Hide resolved
}));
}
}

// updates the shard state metadata for shards of a snapshot that is to be deleted. Also computes the files to be cleaned up.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,9 @@ public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOExce
if (blockOnDeleteIndexN && blobNames.stream().anyMatch(name -> name.startsWith(BlobStoreRepository.INDEX_FILE_PREFIX))) {
blockExecutionAndMaybeWait("index-{N}");
}
if (setThrowExceptionWhileDelete) {
throw new IOException("Random exception");
}
super.deleteBlobsIgnoringIfNotExists(blobNames);
}

Expand Down