Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize stale blobs deletion during snapshot delete #2159

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
import org.opensearch.action.admin.cluster.repositories.verify.VerifyRepositoryResponse;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.Metadata;
Expand All @@ -46,6 +48,7 @@
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.RepositoryException;
import org.opensearch.repositories.RepositoryVerificationException;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.snapshots.mockstore.MockRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -198,6 +201,57 @@ public void testResidualStaleIndicesAreDeletedByConsecutiveDelete() throws Excep
logger.info("--> done");
}

public void testSnapshotShardBlobDelete() throws Exception {
Client client = client();
Path repositoryPath = randomRepoPath();
final String repositoryName = "test-repo";
final String firstSnapshot = "first-snapshot";
final String secondSnapshot = "second-snapshot";
final String indexName = "test-idx";

logger.info("--> creating repository at {}", repositoryPath.toAbsolutePath());
createRepository(
"test-repo",
"mock",
Settings.builder().put("location", repositoryPath).put(BlobStoreRepository.MAX_STALE_BLOB_DELETE_BATCH_SIZE.getKey(), 10)
);

logger.info("--> creating index-0 and ingest data");
createIndex(indexName);
ensureGreen();
for (int j = 0; j < 10; j++) {
index(indexName, "_doc", Integer.toString(10 + j), "foo", "bar" + 10 + j);
}
refresh();

logger.info("--> creating first snapshot");
createFullSnapshot(repositoryName, firstSnapshot);

int numberOfFiles = numberOfFiles(repositoryPath);

logger.info("--> adding some more documents to test index");
for (int j = 0; j < 10; ++j) {
final BulkRequest bulkRequest = new BulkRequest();
for (int i = 0; i < 100; ++i) {
bulkRequest.add(new IndexRequest(indexName).source("foo" + j, "bar" + i));
}
client().bulk(bulkRequest).get();
}
refresh();

logger.info("--> creating second snapshot");
createFullSnapshot(repositoryName, secondSnapshot);

// Delete second snapshot
logger.info("--> delete second snapshot");
client.admin().cluster().prepareDeleteSnapshot(repositoryName, secondSnapshot).get();

logger.info("--> make sure that number of files is back to what it was when the first snapshot was made");
assertFileCount(repositoryPath, numberOfFiles);

logger.info("--> done");
}

private RepositoryMetadata findRepository(List<RepositoryMetadata> repositories, String name) {
for (RepositoryMetadata repository : repositories) {
if (repository.name().equals(name)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,15 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
Setting.Property.NodeScope
);

/**
* Setting to set batch size of stale blobs to be deleted.
*/
public static final Setting<Integer> MAX_STALE_BLOB_DELETE_BATCH_SIZE = Setting.intSetting(
"max_stale_blob_delete_batch_size",
1000,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this batch size is used for deleting the unreferenced shard blobs because of snapshot deletion. So probably a better name would be max_shard_blob_delete_batch_size. Also is default of 1000 chosen based on some experiment ?

Copy link
Contributor Author

@piyushdaftary piyushdaftary Jul 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated variable name. Default size of 1000 is chosen because for almost all remote storage, max batch size of bulk delete object is 1000
References:
S3 : https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html
And internally GCP and Azure have 1000 and 100 object delete limit internally.

Setting.Property.NodeScope
);

/**
* Setting to disable writing the {@code index.latest} blob which enables the contents of this repository to be used with a
* url-repository.
Expand All @@ -243,6 +252,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp

protected final boolean supportURLRepo;

private final int maxStaleBlobDeleteBatch;

private final boolean compress;

private final boolean cacheRepositoryData;
Expand Down Expand Up @@ -358,6 +369,7 @@ protected BlobStoreRepository(
readOnly = metadata.settings().getAsBoolean("readonly", false);
cacheRepositoryData = CACHE_REPOSITORY_DATA.get(metadata.settings());
bufferSize = Math.toIntExact(BUFFER_SIZE_SETTING.get(metadata.settings()).getBytes());
maxStaleBlobDeleteBatch = MAX_STALE_BLOB_DELETE_BATCH_SIZE.get(metadata.settings());
}

@Override
Expand Down Expand Up @@ -902,15 +914,58 @@ private void asyncCleanupUnlinkedShardLevelBlobs(
listener.onResponse(null);
return;
}
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> {
try {
deleteFromContainer(blobContainer(), filesToDelete);
l.onResponse(null);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("{} Failed to delete some blobs during snapshot delete", snapshotIds), e);
throw e;

final BlockingQueue<List<String>> staleFilesToDeleteInBatch = new LinkedBlockingQueue<>();
final List<String> partition = new ArrayList<>();

try {
for (String key : filesToDelete) {
partition.add(key);
if (maxStaleBlobDeleteBatch == partition.size()) {
staleFilesToDeleteInBatch.add(new ArrayList<>(partition));
partition.clear();
}
}
}));
if (partition.isEmpty() == false) {
staleFilesToDeleteInBatch.add(new ArrayList<>(partition));
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can simplify the above logic as below to avoid the clear of partition list since we anyways have to create a new list for each batch.

final List<String> partition = new ArrayList<>();
for (String key : filesToDelete) {
   partition.add(key);
   if (maxStaleBlobDeleteBatch == partition.size()) {
        staleFilesToDeleteInBatch.add(partition);
        partition = new ArrayList<>();
   }
}

if (partition.isEmpty() == false) {
    staleFilesToDeleteInBatch.add(partition);
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Above mentioned code is doing shallow copy of partition list, which will result in erroneous contents of staleFilesToDeleteInBatch BlockingQueue. The pt code do deep copy, thus is been implemented in such way.


final GroupedActionListener<Void> groupedListener = new GroupedActionListener<>(
ActionListener.wrap(r -> { listener.onResponse(null); }, listener::onFailure),
staleFilesToDeleteInBatch.size()
);

// Start as many workers as fit into the snapshot pool at once at the most
final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), filesToDelete.size());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should use staleFilesToDeleteInBatch instead of filesToDelete to compute the worker count. For cases where there are < maxStaleBlobDeleteBatch count of files, then it will create only 1 batch whereas snapshot pool can have more than 1 max worker.

for (int i = 0; i < workers; ++i) {
executeStaleShardDelete(staleFilesToDeleteInBatch, groupedListener);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason we are submitting the batches to delete in this fashion ? At high level, it is dividing files to delete in batches. Then each of those batches needs to be deleted in parallel by all the snapshot threadpool workers. We can achieve this by simply:

  1. creating the batches as done in staleFilesToDeleteInBatch. It can be a regular list instead of BlockingQueue
  2. Having a for loop for number of batches and submitting work to the snapshot threadpool. The threadpool will handle spawning multiple threads and assigning work to each of the thread up to the max number as needed.
private void executeStaleShardDelete(List<List<String>> staleFilesToDeleteInBatch, GroupedActionListener<Void> listener) throws InterruptedException {
        for (List<String> batchToDelete :  staleFilesToDeleteInBatch) {
               threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> {
                try {
                    deleteFromContainer(blobContainer(), batchToDelete);
                    l.onResponse(null);
                } catch (Exception e) {
                    logger.warn(() -> new ParameterizedMessage("{} Failed to delete blobs during snapshot delete", metadata.name()), e);
                    throw e;
                }
            }));
         }
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I can chime in here - the method @piyushdaftary is using is a work-stealing method. there are only about 5 threads in the threadpool so the for loop will initialize only the number of threads available for this action. then each thread is going to steal work from the queue. this is very efficient and is done in the other methods in this class where parallelism was introduced.
therefore, the queue must be blocking, and the loop makes sense as well, the way it is now.

This ensures that the threadpool queue is never used, an important thing to achieve in order to avoid the queue being filled by this task in case of huge amount of shards to delete.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AmiStrn : Thanks for sharing the context. Here for each task the batching is already done with 1k shards per batch. So if there are 50k shards to delete it will have overall 50 tasks to execute instead of 50k tasks. This behavior is same in both the ways (implemented vs suggested).

The benefit which the current implementation has over the suggested one is at any point in time it will submit only 5 tasks for this deletion in the queue. Once threads are available to execute those submitted ones then it will submit one more to the threadpool. This way we don't add 50 tasks all at once and give chance to other snapshot related tasks which are added in this common pool to be executed as well. It seems to me we are trying to provide fairness/priority to different snapshot tasks using the same threadpool. If that is the goal I think we should separate the threadpools for different snapshot tasks. Thoughts ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That could be ok as well and would simplify some things, but I would suggest that is done separately and refactored as part of another PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sohami : Here filesToDelete is the list of repository files to delete (Shard Blobs) not the shards, so for 50K shards, the number of actual files may be way higher.

One of the reason submitting in batches in this fashion is to not let snapshot restore wait because of snapshot deletion taking long time to complete.

I don't see any immediate concern with current implementation and agree with @AmiStrn to do refactoring of the blocking queue and execute delete refactoring be done as separate PR with modification of the logic at all the places i.e snapshot restore, snapshot shard and snapshot delete stale indices.

}

} catch (Exception e) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like all the exception is passed through the listener model so no really sure when this top level exception will be thrown here. But either way, it seems like we should invoke the listener passed in this method in case of this top level exception as well. Any reason to not do that ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the ActionRunnable.wrap catches the exception and invokes l.onFailure(), so this will not be reached.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right that's what I thought, so we should remove this catch block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally this catch statement should never be reached as we have ActionRunnable.wrap . But to be on safe side, I think it's better to blanket catch the exception and call listener.OnFailure from the catch block incase of any runtime exception.

Do you see any concern with the catch block ? @AmiStrn @sohami ?

// TODO: We shouldn't be blanket catching and suppressing all exceptions here and instead handle them safely upstream.
// Currently this catch exists as a stop gap solution to tackle unexpected runtime exceptions from implementations
// bubbling up and breaking the snapshot functionality.
assert false : e;
logger.warn(new ParameterizedMessage("[{}] Exception during cleanup of stale blobs", snapshotIds), e);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the log message be .... during cleanup of unreferenced shard blobs

}
}

private void executeStaleShardDelete(BlockingQueue<List<String>> staleFilesToDeleteInBatch, GroupedActionListener<Void> listener)
throws InterruptedException {
List<String> filesToDelete = staleFilesToDeleteInBatch.poll(0L, TimeUnit.MILLISECONDS);
if (filesToDelete != null) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> {
try {
deleteFromContainer(blobContainer(), filesToDelete);
l.onResponse(null);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("{} Failed to delete blobs during snapshot delete", metadata.name()), e);
throw e;
}
}));

executeStaleShardDelete(staleFilesToDeleteInBatch, listener);
Comment on lines +965 to +967
Copy link
Contributor

@AmiStrn AmiStrn Feb 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the part missing for the work-stealing method used throughout this file. Why did you choose to have the recursion outside the initial thread? this is a (minor) problem since it doesn't correspond with the for loop initializing the threads.

Suggested change
}));
executeStaleShardDelete(staleFilesToDeleteInBatch, listener);
executeStaleShardDelete(staleFilesToDeleteInBatch, listener);
}));

Also, this means you would need to change the way you handle errors. The exception would terminate the thread, so potentially you could reach a situation where the GroupedActionListener is waiting forever and the process gets stuck, for example:

  • X threads start running
  • Eventually, they all get an exception and stop the recursion before the queue is empty!
  • the GroupedActionListener is stuck waiting for the tasks to be done...

I suggest not throwing the exception, but invoking l.onFailue(e) if there is an exception. this way the other shards may still be handled.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ActionRunnable which is of type AbstractRunnable invokes the l.onFailure(e) when runnable throws any exception. Ref here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no reason not to simply invoke onFailure with the exception in it -> the same outcome, except that the exception would stop current recursion and process could hang.

}
}

// updates the shard state metadata for shards of a snapshot that is to be deleted. Also computes the files to be cleaned up.
Expand Down