-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use Azure blob batch API to delete blobs in batches #114566
Changes from 12 commits
8e08e60
8f67b0a
86a59e9
22cfdb8
83ef063
798b83b
57286c4
219cf40
d0fc6ee
c566966
d5aa10f
87b08e6
b034c50
ebc47e8
e3cb397
5df3384
caf1ab7
05e01c1
e744cbd
6115885
b08d6af
06a3b5d
84211f9
aa7ecfb
1b12a63
9a3718a
2f37338
dda6e8a
d546ec5
8588247
4641580
6b94158
8efb2e1
09f27b8
b913fd0
ac772ec
dfe0d9c
6818ec7
eab8700
93bfe04
d9ce4b5
84ff3c2
103aec4
6c4697e
beae254
5f1143e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,18 +18,15 @@ | |
requires org.apache.logging.log4j; | ||
requires org.apache.logging.log4j.core; | ||
|
||
requires com.azure.core; | ||
requires com.azure.http.netty; | ||
requires com.azure.storage.blob; | ||
requires com.azure.storage.common; | ||
requires com.azure.identity; | ||
|
||
requires io.netty.buffer; | ||
requires io.netty.transport; | ||
requires io.netty.resolver; | ||
requires io.netty.common; | ||
|
||
requires reactor.core; | ||
requires reactor.netty.core; | ||
requires reactor.netty.http; | ||
requires com.azure.storage.blob.batch; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IntelliJ seemed to optimize the |
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,10 +21,13 @@ | |
import com.azure.core.util.BinaryData; | ||
import com.azure.storage.blob.BlobAsyncClient; | ||
import com.azure.storage.blob.BlobClient; | ||
import com.azure.storage.blob.BlobContainerAsyncClient; | ||
import com.azure.storage.blob.BlobContainerClient; | ||
import com.azure.storage.blob.BlobServiceAsyncClient; | ||
import com.azure.storage.blob.BlobServiceClient; | ||
import com.azure.storage.blob.batch.BlobBatch; | ||
import com.azure.storage.blob.batch.BlobBatchAsyncClient; | ||
import com.azure.storage.blob.batch.BlobBatchClientBuilder; | ||
import com.azure.storage.blob.batch.BlobBatchStorageException; | ||
import com.azure.storage.blob.models.BlobErrorCode; | ||
import com.azure.storage.blob.models.BlobItem; | ||
import com.azure.storage.blob.models.BlobItemProperties; | ||
|
@@ -84,21 +87,20 @@ | |
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
import java.util.Spliterator; | ||
import java.util.Spliterators; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
import java.util.concurrent.atomic.LongAdder; | ||
import java.util.function.BiPredicate; | ||
import java.util.stream.Collectors; | ||
import java.util.stream.StreamSupport; | ||
|
||
import static org.elasticsearch.core.Strings.format; | ||
|
||
public class AzureBlobStore implements BlobStore { | ||
private static final Logger logger = LogManager.getLogger(AzureBlobStore.class); | ||
// See https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch#request-body | ||
private static final int MAX_ELEMENTS_PER_BATCH = 256; | ||
ywangd marked this conversation as resolved.
Show resolved
Hide resolved
|
||
private static final long DEFAULT_READ_CHUNK_SIZE = new ByteSizeValue(32, ByteSizeUnit.MB).getBytes(); | ||
private static final int DEFAULT_UPLOAD_BUFFERS_SIZE = (int) new ByteSizeValue(64, ByteSizeUnit.KB).getBytes(); | ||
|
||
|
@@ -147,7 +149,8 @@ public AzureBlobStore( | |
&& isPutBlockRequest(httpMethod, url) == false | ||
&& isPutBlockListRequest(httpMethod, url) == false, | ||
Operation.PUT_BLOB | ||
) | ||
), | ||
new RequestMatcher(AzureBlobStore::isBatchDelete, Operation.BATCH_DELETE) | ||
); | ||
|
||
this.requestMetricsHandler = (purpose, method, url, metrics) -> { | ||
|
@@ -172,6 +175,10 @@ && isPutBlockListRequest(httpMethod, url) == false, | |
}; | ||
} | ||
|
||
private static boolean isBatchDelete(HttpMethod method, URL url) { | ||
return method == HttpMethod.POST && url.getQuery() != null && url.getQuery().contains("comp=batch"); | ||
} | ||
|
||
private static boolean isListRequest(HttpMethod httpMethod, URL url) { | ||
return httpMethod == HttpMethod.GET && url.getQuery() != null && url.getQuery().contains("comp=list"); | ||
} | ||
|
@@ -231,30 +238,28 @@ public boolean blobExists(OperationPurpose purpose, String blob) throws IOExcept | |
} | ||
} | ||
|
||
// number of concurrent blob delete requests to use while bulk deleting | ||
private static final int CONCURRENT_DELETES = 100; | ||
|
||
public DeleteResult deleteBlobDirectory(OperationPurpose purpose, String path) { | ||
final AtomicInteger blobsDeleted = new AtomicInteger(0); | ||
final AtomicLong bytesDeleted = new AtomicLong(0); | ||
final List<String> blobNames = new ArrayList<>(); | ||
|
||
SocketAccess.doPrivilegedVoidException(() -> { | ||
final BlobContainerAsyncClient blobContainerAsyncClient = asyncClient(purpose).getBlobContainerAsyncClient(container); | ||
final ListBlobsOptions options = new ListBlobsOptions().setPrefix(path) | ||
.setDetails(new BlobListDetails().setRetrieveMetadata(true)); | ||
final AzureBlobServiceClient client = getAzureBlobServiceClientClient(purpose); | ||
final BlobContainerClient blobContainerClient = client.getSyncClient().getBlobContainerClient(container); | ||
try { | ||
blobContainerAsyncClient.listBlobs(options, null).flatMap(blobItem -> { | ||
if (blobItem.isPrefix() != null && blobItem.isPrefix()) { | ||
return Mono.empty(); | ||
} else { | ||
final String blobName = blobItem.getName(); | ||
BlobAsyncClient blobAsyncClient = blobContainerAsyncClient.getBlobAsyncClient(blobName); | ||
final Mono<Void> deleteTask = getDeleteTask(blobName, blobAsyncClient); | ||
bytesDeleted.addAndGet(blobItem.getProperties().getContentLength()); | ||
blobsDeleted.incrementAndGet(); | ||
return deleteTask; | ||
final ListBlobsOptions options = new ListBlobsOptions().setPrefix(path) | ||
.setDetails(new BlobListDetails().setRetrieveMetadata(true)); | ||
for (BlobItem blobItem : blobContainerClient.listBlobs(options, null)) { | ||
if (blobItem.isPrefix()) { | ||
continue; | ||
} | ||
}, CONCURRENT_DELETES).then().block(); | ||
blobNames.add(blobItem.getName()); | ||
bytesDeleted.addAndGet(blobItem.getProperties().getContentLength()); | ||
blobsDeleted.incrementAndGet(); | ||
} | ||
if (blobNames.isEmpty() == false) { | ||
deleteListOfBlobs(client, blobNames.iterator()); | ||
} | ||
} catch (Exception e) { | ||
filterDeleteExceptionsAndRethrow(e, new IOException("Deleting directory [" + path + "] failed")); | ||
} | ||
|
@@ -278,48 +283,44 @@ private static void filterDeleteExceptionsAndRethrow(Exception e, IOException ex | |
throw exception; | ||
} | ||
|
||
/** | ||
* {@inheritDoc} | ||
* <p> | ||
* Note that in this Azure implementation we issue a series of individual | ||
* <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/delete-blob">delete blob</a> calls rather than aggregating | ||
* deletions into <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch">blob batch</a> calls. | ||
* The reason for this is that the blob batch endpoint has limited support for SAS token authentication. | ||
* | ||
* @see <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=shared-access-signatures#authorization"> | ||
* API docs around SAS auth limitations</a> | ||
* @see <a href="https://github.com/Azure/azure-storage-java/issues/538">Java SDK issue</a> | ||
* @see <a href="https://github.com/elastic/elasticsearch/pull/65140#discussion_r528752070">Discussion on implementing PR</a> | ||
*/ | ||
@Override | ||
public void deleteBlobsIgnoringIfNotExists(OperationPurpose purpose, Iterator<String> blobs) { | ||
if (blobs.hasNext() == false) { | ||
public void deleteBlobsIgnoringIfNotExists(OperationPurpose purpose, Iterator<String> blobNames) throws IOException { | ||
if (blobNames.hasNext() == false) { | ||
return; | ||
} | ||
|
||
BlobServiceAsyncClient asyncClient = asyncClient(purpose); | ||
SocketAccess.doPrivilegedVoidException(() -> { | ||
final BlobContainerAsyncClient blobContainerClient = asyncClient.getBlobContainerAsyncClient(container); | ||
try { | ||
Flux.fromStream(StreamSupport.stream(Spliterators.spliteratorUnknownSize(blobs, Spliterator.ORDERED), false)) | ||
.flatMap(blob -> getDeleteTask(blob, blobContainerClient.getBlobAsyncClient(blob)), CONCURRENT_DELETES) | ||
.then() | ||
.block(); | ||
} catch (Exception e) { | ||
filterDeleteExceptionsAndRethrow(e, new IOException("Unable to delete blobs")); | ||
} | ||
}); | ||
SocketAccess.doPrivilegedVoidException(() -> deleteListOfBlobs(getAzureBlobServiceClientClient(purpose), blobNames)); | ||
} | ||
|
||
private static Mono<Void> getDeleteTask(String blobName, BlobAsyncClient blobAsyncClient) { | ||
return blobAsyncClient.delete() | ||
// Ignore not found blobs, as it's possible that due to network errors a request | ||
// for an already deleted blob is retried, causing an error. | ||
.onErrorResume( | ||
e -> e instanceof BlobStorageException blobStorageException && blobStorageException.getStatusCode() == 404, | ||
throwable -> Mono.empty() | ||
) | ||
.onErrorMap(throwable -> new IOException("Error deleting blob " + blobName, throwable)); | ||
private void deleteListOfBlobs(AzureBlobServiceClient azureBlobServiceClient, Iterator<String> blobNames) throws IOException { | ||
// We need to use a container-scoped BlobBatchClient, so the restype=container parameter | ||
// is sent, and we can support all SAS token types | ||
// See https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=shared-access-signatures#authorization | ||
final BlobBatchAsyncClient batchAsyncClient = new BlobBatchClientBuilder( | ||
azureBlobServiceClient.getAsyncClient().getBlobContainerAsyncClient(container) | ||
).buildAsyncClient(); | ||
Comment on lines
+279
to
+284
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I assume this still works with non-container-scoped tokens and other azure crendential types that we support? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've just kicked off the tests with the latest changes https://buildkite.com/elastic/elasticsearch-periodic/builds/4557 EDIT: third party tests all still pass :) |
||
final List<Mono<Void>> batchResponses = new ArrayList<>(); | ||
while (blobNames.hasNext()) { | ||
final BlobBatch currentBatch = batchAsyncClient.getBlobBatch(); | ||
int counter = 0; | ||
while (counter < MAX_ELEMENTS_PER_BATCH && blobNames.hasNext()) { | ||
currentBatch.deleteBlob(container, blobNames.next()); | ||
counter++; | ||
} | ||
batchResponses.add(batchAsyncClient.submitBatch(currentBatch)); | ||
} | ||
try { | ||
Flux.merge(batchResponses).collectList().block(); | ||
} catch (BlobBatchStorageException bbse) { | ||
final Iterable<BlobStorageException> batchExceptions = bbse.getBatchExceptions(); | ||
for (BlobStorageException bse : batchExceptions) { | ||
// If one of the requests failed with something other than a BLOB_NOT_FOUND, throw the encompassing exception | ||
if (BlobErrorCode.BLOB_NOT_FOUND.equals(bse.getErrorCode()) == false) { | ||
throw new IOException("Failed to delete batch", bbse); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For my education: Does blobBatch process all batches before it throws any exception or does it stop processing as soon as the first exception is encountered? I also assume all sub-requests in a batch will all get processed if the batch itself is processed regardless whether all of the sub-requests run into error? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah it'll run the full batch then return all the results. This is tested in |
||
} | ||
} catch (Exception e) { | ||
throw new IOException("Unable to delete blobs"); | ||
} | ||
} | ||
|
||
public InputStream getInputStream(OperationPurpose purpose, String blob, long position, final @Nullable Long length) { | ||
|
@@ -689,7 +690,8 @@ enum Operation { | |
GET_BLOB_PROPERTIES("GetBlobProperties"), | ||
PUT_BLOB("PutBlob"), | ||
PUT_BLOCK("PutBlock"), | ||
PUT_BLOCK_LIST("PutBlockList"); | ||
PUT_BLOCK_LIST("PutBlockList"), | ||
BATCH_DELETE("BatchDelete"); | ||
|
||
private final String key; | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -317,6 +317,11 @@ private enum RetryMetricsTracker implements HttpPipelinePolicy { | |
|
||
@Override | ||
public Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineNextPolicy next) { | ||
if (requestIsPartOfABatch(context)) { | ||
// Batch deletes fire once for each of the constituent requests, and they have a null response. Ignore those, we'll track | ||
// metrics at the bulk level. | ||
return next.process(); | ||
} | ||
Comment on lines
+320
to
+324
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It feels a bit odd that each individual deletion would go through the pipeline. But I assume that's just how azure sdk works. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It actually might come in handy if we were batching billable actions. Delete is free so it doesn't matter, but the other action you can batch ("set blob tier") is not, and batches are billed per consituent request. If we were batching these we'd want to count those individual requests. |
||
Optional<Object> metricsData = context.getData(RequestMetricsTracker.ES_REQUEST_METRICS_CONTEXT_KEY); | ||
if (metricsData.isPresent() == false) { | ||
assert false : "No metrics object associated with request " + context.getHttpRequest(); | ||
|
@@ -361,6 +366,11 @@ private RequestMetricsTracker(OperationPurpose purpose, RequestMetricsHandler re | |
|
||
@Override | ||
public Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineNextPolicy next) { | ||
if (requestIsPartOfABatch(context)) { | ||
// Batch deletes fire once for each of the constituent requests, and they have a null response. Ignore those, we'll track | ||
// metrics at the bulk level. | ||
return next.process(); | ||
} | ||
final RequestMetrics requestMetrics = new RequestMetrics(); | ||
context.setData(ES_REQUEST_METRICS_CONTEXT_KEY, requestMetrics); | ||
return next.process().doOnSuccess((httpResponse) -> { | ||
|
@@ -389,6 +399,10 @@ public HttpPipelinePosition getPipelinePosition() { | |
} | ||
} | ||
|
||
private static boolean requestIsPartOfABatch(HttpPipelineCallContext context) { | ||
return context.getData("Batch-Operation-Info").isPresent(); | ||
} | ||
|
||
/** | ||
* The {@link RequestMetricsTracker} calls this when a request completes | ||
*/ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the version consistent with the others from the BOM