-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use Azure blob batch API to delete blobs in batches #114566
Use Azure blob batch API to delete blobs in batches #114566
Conversation
2d9b166
to
8e08e60
Compare
@@ -30,6 +30,7 @@ dependencies { | |||
api "com.azure:azure-identity:1.13.2" | |||
api "com.azure:azure-json:1.2.0" | |||
api "com.azure:azure-storage-blob:12.27.1" | |||
api "com.azure:azure-storage-blob-batch:12.23.1" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the version consistent with the others from the BOM
…_deletions_in_azure
requires reactor.netty.core; | ||
requires reactor.netty.http; | ||
requires com.azure.storage.blob.batch; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IntelliJ seemed to optimize the requires
, the ones removed above are all transitively required by com.azure.storage.blob.batch
.
…thout inspecting the body)
@@ -689,7 +690,8 @@ enum Operation { | |||
GET_BLOB_PROPERTIES("GetBlobProperties"), | |||
PUT_BLOB("PutBlob"), | |||
PUT_BLOCK("PutBlock"), | |||
PUT_BLOCK_LIST("PutBlockList"); | |||
PUT_BLOCK_LIST("PutBlockList"), | |||
BLOB_BATCH("BlobBatch"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't be specific about the type of operation we're performing in a batch without inspecting the request body. I think it's better to track BlobBatch
than potentially erroneously track BatchDelete
(if one day we start using batch to "set access tier")
Pinging @elastic/es-distributed (Team:Distributed) |
…_deletions_in_azure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the delay here. I took a second and closer look at the changes. I think we might want to consider adding controls for resource usages (heap and concurrent requests).
Btw, the PR should be now labelled as :>enhancement
due to the new setting.
@@ -129,6 +134,7 @@ public AzureBlobStore( | |||
// locationMode is set per repository, not per client | |||
this.locationMode = Repository.LOCATION_MODE_SETTING.get(metadata.settings()); | |||
this.maxSinglePartUploadSize = Repository.MAX_SINGLE_PART_UPLOAD_SIZE_SETTING.get(metadata.settings()); | |||
this.maxDeletesPerBatch = Repository.DELETION_BATCH_SIZE_SETTING.get(metadata.settings()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Can we rename this field to be deletionBatchSize
which is consistent with the setting name and avoid clashing with the static MAX_ELEMENTS_PER_BATCH
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -52,6 +52,10 @@ public static void doPrivilegedVoidException(StorageRunnable action) { | |||
} | |||
} | |||
|
|||
public static <E extends Exception> void doPrivilegedVoidExceptionExplicit(Class<E> exception, StorageRunnable action) throws E { | |||
doPrivilegedVoidException(action); | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this necessary? The existing code works ok without the explicit throws? If we want to change this, I'd prefer to update the existing method so that it explicit throws IOException in its catch block if the cause is an IOException. Since it requires some cascading changes, I think a separate PR would be better.
for (BlobItem blobItem : blobContainerClient.listBlobs(options, null)) { | ||
if (blobItem.isPrefix()) { | ||
continue; | ||
} | ||
blobNames.add(blobItem.getName()); | ||
bytesDeleted.addAndGet(blobItem.getProperties().getContentLength()); | ||
blobsDeleted.incrementAndGet(); | ||
} | ||
if (blobNames.isEmpty() == false) { | ||
deleteListOfBlobs(client, blobNames.iterator()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder whether there is an issue in materializing all blobItems from the listing before invoking delete. If there are a large number of items, it could be rather inefficient. IIUC, listBlobs
returns an Iterable that lazily load. I think this change means we no longer leverage it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call, I've changed this now to use Flux all the way through. I think that should pipeline all this stuff better.
final List<Mono<Void>> batchResponses = new ArrayList<>(); | ||
while (blobNames.hasNext()) { | ||
final BlobBatch currentBatch = batchAsyncClient.getBlobBatch(); | ||
int counter = 0; | ||
while (counter < maxDeletesPerBatch && blobNames.hasNext()) { | ||
currentBatch.deleteBlob(container, blobNames.next()); | ||
counter++; | ||
} | ||
batchResponses.add(batchAsyncClient.submitBatch(currentBatch)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is more likely a theoretical concern. Technically the number of concurrent requests here are also unbounded while previously it is hard-coded to 100.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've limited these. There is an underlying limit imposed at the node level (max open connections for a pool shared between clients, which defaults to 50 and Azure is HTTP/1.1 so that's effectively a global max concurrent requests for a node) and also for the execution of these blocks that dispatch the request there's a thread pool limit in the reactor runtime (which seems to default to 5 threads).
So I think with a limit of 100 the actual number of concurrent requests would be much lower. In any case I've added an explicit limit which is configurable and defaults to 10. Because these are bulk requests that means by default that's a maximum of 2560 concurrent individual deletes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for explaining. Makes sense from the networking perspective. I should have been more clear. By unbounded number of requests, I mostly mean the number of request objects that are constructed during this process. I guess they are eagerly instantiated even when the underlying network stack is not ready to consume them? If so, they consume memory and in extreme case may even lead to oom.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yep I understand. I think the limitation I added should restrict that as it'll limit the number of concurrent subscribers. As I understand it nothing happens until a subscriber asks for the next value(s) so there should only be at most 10 batch requests being processed at any one time.
final CountDownLatch allRequestsFinished = new CountDownLatch(deleteTasks.size()); | ||
final List<Throwable> errors = new CopyOnWriteArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly, I think we should limit the number of errors. Also, seeing the CountDownLatch makes me think whether it is possible to leverage the Flux approach similar to how existing deletion code reiles on Flux.then().blocks()
. Maybe something like Flux#fromIterable
so that it takes a custom Iterable implementation that internally constructs deletion requests which in turn consumes the listing response. I feel it could somewhat address my previous comments about limiting resource usages. It's just a rough idea. There maybe issues that I just haven't noticed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in b913fd0
…_deletions_in_azure
Hi @nicktindall, I've created a changelog YAML for you. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Thanks for the iterations!
logger.info("Using SAS token authentication"); | ||
secureSettings.setString("azure.client.default.sas_token", System.getProperty("test.azure.sas_token")); | ||
} else { | ||
logger.info("Using key authentication"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: can we add -->
in the beginning of the logging messages? It's an informal conventional to make these test logging messages easier to search.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added in d9ce4b5
// We need to use a container-scoped BlobBatchClient, so the restype=container parameter | ||
// is sent, and we can support all SAS token types | ||
// See https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=shared-access-signatures#authorization | ||
final BlobBatchAsyncClient batchAsyncClient = new BlobBatchClientBuilder( | ||
azureBlobServiceClient.getAsyncClient().getBlobContainerAsyncClient(container) | ||
).buildAsyncClient(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume this still works with non-container-scoped tokens and other azure crendential types that we support?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've just kicked off the tests with the latest changes https://buildkite.com/elastic/elasticsearch-periodic/builds/4557
EDIT: third party tests all still pass :)
}); | ||
}, maxConcurrentBatchDeletes).collectList().block(); | ||
if (errors.isEmpty() == false) { | ||
final IOException ex = new IOException("Error deleting batches"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I think we can include a brief message about exactly how many errors have been encountered if errorsCollected
is greater than 10 so that it is clear that some errors are skipped.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added in 84ff3c2
} catch (RuntimeException e) { | ||
throw new IOException("Error deleting batches", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly for my own education: Why are we specifically catching RuntimeException here? Is there a concrete concern of anything thrown here or is it to match the existing code. The existing code seems to catch a broader Exception
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was really because there are no checked exceptions thrown in the try/catch block, perhaps it's safer just to catch exception in case there's a SocketAccess.doPrivilegedVoidException
-type scenario going on in there somewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Broadened to Exception
in 103aec4
/** | ||
* The maximum number of concurrent batch deletes | ||
*/ | ||
static final Setting<Integer> MAX_CONCURRENT_BATCH_DELETES_SETTING = Setting.intSetting("max_concurrent_batch_deletes", 10, 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest we give it a sensible max value, e.g. 100
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added in 6c4697e
Co-authored-by: Yang Wang <[email protected]>
…_deletions_in_azure
This PR implements blob deletion as one or more blob batch requests, rather than deleting each blob individually.
The reason this wasn't implemented originally was due to concerns around the blob batch API's SAS token auth support.
The difference in the approach in this PR is down to the use of a container-scoped client which sends an additional request parameter (
restype=container
). Using the API in this way means that SAS tokens are supported.I ran this branch through the
elasticsearch / periodic
pipeline (results here) and everything passed. If I'm reading it correctly, that includes running theAzureStorageCleanupThirdPartyTests
using a SAS token, and that test includes code paths that use the new bulk delete.Closes ES-9777