diff --git a/docs/changelog/114566.yaml b/docs/changelog/114566.yaml
new file mode 100644
index 0000000000000..6007152bb26ca
--- /dev/null
+++ b/docs/changelog/114566.yaml
@@ -0,0 +1,5 @@
+pr: 114566
+summary: Use Azure blob batch API to delete blobs in batches
+area: Distributed
+type: enhancement
+issues: []
diff --git a/docs/reference/snapshot-restore/repository-azure.asciidoc b/docs/reference/snapshot-restore/repository-azure.asciidoc
index c361414052e14..0e6e1478cfc55 100644
--- a/docs/reference/snapshot-restore/repository-azure.asciidoc
+++ b/docs/reference/snapshot-restore/repository-azure.asciidoc
@@ -259,6 +259,15 @@ include::repository-shared-settings.asciidoc[]
`primary_only` or `secondary_only`. Defaults to `primary_only`. Note that if you set it
to `secondary_only`, it will force `readonly` to true.
+`delete_objects_max_size`::
+
+ (integer) Sets the maxmimum batch size, betewen 1 and 256, used for `BlobBatch` requests. Defaults to 256 which is the maximum
+ number supported by the https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch#remarks[Azure blob batch API].
+
+`max_concurrent_batch_deletes`::
+
+ (integer) Sets the maximum number of concurrent batch delete requests that will be submitted for any individual bulk delete with `BlobBatch`. Note that the effective number of concurrent deletes is further limited by the Azure client connection and event loop thread limits. Defaults to 10, minimum is 1, maximum is 100.
+
[[repository-azure-validation]]
==== Repository validation rules
diff --git a/gradle/verification-metadata.xml b/gradle/verification-metadata.xml
index e2dfa89c8f3b8..5cfe7adb5ea49 100644
--- a/gradle/verification-metadata.xml
+++ b/gradle/verification-metadata.xml
@@ -144,6 +144,11 @@
+
+
+
+
+
diff --git a/modules/repository-azure/build.gradle b/modules/repository-azure/build.gradle
index eb938f663c810..d011de81f4fb3 100644
--- a/modules/repository-azure/build.gradle
+++ b/modules/repository-azure/build.gradle
@@ -30,6 +30,7 @@ dependencies {
api "com.azure:azure-identity:1.13.2"
api "com.azure:azure-json:1.2.0"
api "com.azure:azure-storage-blob:12.27.1"
+ api "com.azure:azure-storage-blob-batch:12.23.1"
api "com.azure:azure-storage-common:12.26.1"
api "com.azure:azure-storage-internal-avro:12.12.1"
api "com.azure:azure-xml:1.1.0"
diff --git a/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryMetricsTests.java b/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryMetricsTests.java
index a9bf0afa37e18..61940be247861 100644
--- a/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryMetricsTests.java
+++ b/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryMetricsTests.java
@@ -9,14 +9,18 @@
package org.elasticsearch.repositories.azure;
+import com.sun.net.httpserver.Headers;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.OperationPurpose;
+import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.repositories.RepositoriesMetrics;
@@ -31,6 +35,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
@@ -43,6 +48,7 @@
import java.util.stream.IntStream;
import static org.elasticsearch.repositories.azure.AbstractAzureServerTestCase.randomBlobContent;
+import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
@@ -225,6 +231,91 @@ public void testRequestTimeIsAccurate() throws IOException {
assertThat(recordedRequestTime, lessThanOrEqualTo(elapsedTimeMillis));
}
+ public void testBatchDeleteFailure() throws IOException {
+ final int deleteBatchSize = randomIntBetween(1, 30);
+ final String repositoryName = randomRepositoryName();
+ final String repository = createRepository(
+ repositoryName,
+ Settings.builder()
+ .put(repositorySettings(repositoryName))
+ .put(AzureRepository.Repository.DELETION_BATCH_SIZE_SETTING.getKey(), deleteBatchSize)
+ .build(),
+ true
+ );
+ final String dataNodeName = internalCluster().getNodeNameThat(DiscoveryNode::canContainData);
+ final BlobContainer container = getBlobContainer(dataNodeName, repository);
+
+ final List blobsToDelete = new ArrayList<>();
+ final int numberOfBatches = randomIntBetween(3, 20);
+ final int numberOfBlobs = numberOfBatches * deleteBatchSize;
+ final int failedBatches = randomIntBetween(1, numberOfBatches);
+ for (int i = 0; i < numberOfBlobs; i++) {
+ byte[] bytes = randomBytes(randomInt(100));
+ String blobName = "index-" + randomAlphaOfLength(10);
+ container.writeBlob(randomPurpose(), blobName, new BytesArray(bytes), false);
+ blobsToDelete.add(blobName);
+ }
+ Randomness.shuffle(blobsToDelete);
+ clearMetrics(dataNodeName);
+
+ // Handler will fail one or more of the batch requests
+ final RequestHandler failNRequestRequestHandler = createFailNRequestsHandler(failedBatches);
+
+ // Exhaust the retries
+ IntStream.range(0, (numberOfBatches - failedBatches) + (failedBatches * (MAX_RETRIES + 1)))
+ .forEach(i -> requestHandlers.offer(failNRequestRequestHandler));
+
+ logger.info("--> Failing {} of {} batches", failedBatches, numberOfBatches);
+
+ final IOException exception = assertThrows(
+ IOException.class,
+ () -> container.deleteBlobsIgnoringIfNotExists(randomPurpose(), blobsToDelete.iterator())
+ );
+ assertEquals(Math.min(failedBatches, 10), exception.getSuppressed().length);
+ assertEquals(
+ (numberOfBatches - failedBatches) + (failedBatches * (MAX_RETRIES + 1L)),
+ getLongCounterTotal(dataNodeName, RepositoriesMetrics.METRIC_REQUESTS_TOTAL)
+ );
+ assertEquals((failedBatches * (MAX_RETRIES + 1L)), getLongCounterTotal(dataNodeName, RepositoriesMetrics.METRIC_EXCEPTIONS_TOTAL));
+ assertEquals(failedBatches * deleteBatchSize, container.listBlobs(randomPurpose()).size());
+ }
+
+ private long getLongCounterTotal(String dataNodeName, String metricKey) {
+ return getTelemetryPlugin(dataNodeName).getLongCounterMeasurement(metricKey)
+ .stream()
+ .mapToLong(Measurement::getLong)
+ .reduce(0L, Long::sum);
+ }
+
+ /**
+ * Creates a {@link RequestHandler} that will persistently fail the first numberToFail distinct requests
+ * it sees. Any other requests are passed through to the delegate.
+ *
+ * @param numberToFail The number of requests to fail
+ * @return the handler
+ */
+ private static RequestHandler createFailNRequestsHandler(int numberToFail) {
+ final List requestsToFail = new ArrayList<>(numberToFail);
+ return (exchange, delegate) -> {
+ final Headers requestHeaders = exchange.getRequestHeaders();
+ final String requestId = requestHeaders.get("X-ms-client-request-id").get(0);
+ boolean failRequest = false;
+ synchronized (requestsToFail) {
+ if (requestsToFail.contains(requestId)) {
+ failRequest = true;
+ } else if (requestsToFail.size() < numberToFail) {
+ requestsToFail.add(requestId);
+ failRequest = true;
+ }
+ }
+ if (failRequest) {
+ exchange.sendResponseHeaders(500, -1);
+ } else {
+ delegate.handle(exchange);
+ }
+ };
+ }
+
private void clearMetrics(String discoveryNode) {
internalCluster().getInstance(PluginsService.class, discoveryNode)
.filterPlugins(TestTelemetryPlugin.class)
diff --git a/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java b/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java
index 473d91da6e34c..bd21f208faac4 100644
--- a/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java
+++ b/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java
@@ -89,7 +89,9 @@ protected Settings repositorySettings(String repoName) {
.put(super.repositorySettings(repoName))
.put(AzureRepository.Repository.MAX_SINGLE_PART_UPLOAD_SIZE_SETTING.getKey(), new ByteSizeValue(1, ByteSizeUnit.MB))
.put(AzureRepository.Repository.CONTAINER_SETTING.getKey(), "container")
- .put(AzureStorageSettings.ACCOUNT_SETTING.getKey(), "test");
+ .put(AzureStorageSettings.ACCOUNT_SETTING.getKey(), "test")
+ .put(AzureRepository.Repository.DELETION_BATCH_SIZE_SETTING.getKey(), randomIntBetween(5, 256))
+ .put(AzureRepository.Repository.MAX_CONCURRENT_BATCH_DELETES_SETTING.getKey(), randomIntBetween(1, 10));
if (randomBoolean()) {
settingsBuilder.put(AzureRepository.Repository.BASE_PATH_SETTING.getKey(), randomFrom("test", "test/1"));
}
@@ -249,6 +251,8 @@ protected void maybeTrack(String request, Headers headers) {
trackRequest("PutBlockList");
} else if (Regex.simpleMatch("PUT /*/*", request)) {
trackRequest("PutBlob");
+ } else if (Regex.simpleMatch("POST /*/*?*comp=batch*", request)) {
+ trackRequest("BlobBatch");
}
}
@@ -279,10 +283,22 @@ public void testLargeBlobCountDeletion() throws Exception {
}
public void testDeleteBlobsIgnoringIfNotExists() throws Exception {
- try (BlobStore store = newBlobStore()) {
+ // Test with a smaller batch size here
+ final int deleteBatchSize = randomIntBetween(1, 30);
+ final String repositoryName = randomRepositoryName();
+ createRepository(
+ repositoryName,
+ Settings.builder()
+ .put(repositorySettings(repositoryName))
+ .put(AzureRepository.Repository.DELETION_BATCH_SIZE_SETTING.getKey(), deleteBatchSize)
+ .build(),
+ true
+ );
+ try (BlobStore store = newBlobStore(repositoryName)) {
final BlobContainer container = store.blobContainer(BlobPath.EMPTY);
- List blobsToDelete = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
+ final int toDeleteCount = randomIntBetween(deleteBatchSize, 3 * deleteBatchSize);
+ final List blobsToDelete = new ArrayList<>();
+ for (int i = 0; i < toDeleteCount; i++) {
byte[] bytes = randomBytes(randomInt(100));
String blobName = randomAlphaOfLength(10);
container.writeBlob(randomPurpose(), blobName, new BytesArray(bytes), false);
diff --git a/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureStorageCleanupThirdPartyTests.java b/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureStorageCleanupThirdPartyTests.java
index abd4f506a0bb3..6d5c17c392141 100644
--- a/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureStorageCleanupThirdPartyTests.java
+++ b/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureStorageCleanupThirdPartyTests.java
@@ -30,6 +30,8 @@
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.Booleans;
+import org.elasticsearch.logging.LogManager;
+import org.elasticsearch.logging.Logger;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.AbstractThirdPartyRepositoryTestCase;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
@@ -46,6 +48,7 @@
import static org.hamcrest.Matchers.not;
public class AzureStorageCleanupThirdPartyTests extends AbstractThirdPartyRepositoryTestCase {
+ private static final Logger logger = LogManager.getLogger(AzureStorageCleanupThirdPartyTests.class);
private static final boolean USE_FIXTURE = Booleans.parseBoolean(System.getProperty("test.azure.fixture", "true"));
private static final String AZURE_ACCOUNT = System.getProperty("test.azure.account");
@@ -89,8 +92,10 @@ protected SecureSettings credentials() {
MockSecureSettings secureSettings = new MockSecureSettings();
secureSettings.setString("azure.client.default.account", System.getProperty("test.azure.account"));
if (hasSasToken) {
+ logger.info("--> Using SAS token authentication");
secureSettings.setString("azure.client.default.sas_token", System.getProperty("test.azure.sas_token"));
} else {
+ logger.info("--> Using key authentication");
secureSettings.setString("azure.client.default.key", System.getProperty("test.azure.key"));
}
return secureSettings;
diff --git a/modules/repository-azure/src/main/java/module-info.java b/modules/repository-azure/src/main/java/module-info.java
index cd6be56b71543..731f1e0a9986a 100644
--- a/modules/repository-azure/src/main/java/module-info.java
+++ b/modules/repository-azure/src/main/java/module-info.java
@@ -18,10 +18,7 @@
requires org.apache.logging.log4j;
requires org.apache.logging.log4j.core;
- requires com.azure.core;
requires com.azure.http.netty;
- requires com.azure.storage.blob;
- requires com.azure.storage.common;
requires com.azure.identity;
requires io.netty.buffer;
@@ -29,7 +26,7 @@
requires io.netty.resolver;
requires io.netty.common;
- requires reactor.core;
requires reactor.netty.core;
requires reactor.netty.http;
+ requires com.azure.storage.blob.batch;
}
diff --git a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java
index a3f26424324fa..52bc1ee1399d4 100644
--- a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java
+++ b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java
@@ -138,7 +138,7 @@ public void writeMetadataBlob(
}
@Override
- public DeleteResult delete(OperationPurpose purpose) {
+ public DeleteResult delete(OperationPurpose purpose) throws IOException {
return blobStore.deleteBlobDirectory(purpose, keyPath);
}
diff --git a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java
index 829868797e38c..3c64bb9f3b830 100644
--- a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java
+++ b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java
@@ -25,6 +25,10 @@
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceAsyncClient;
import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.batch.BlobBatch;
+import com.azure.storage.blob.batch.BlobBatchAsyncClient;
+import com.azure.storage.blob.batch.BlobBatchClientBuilder;
+import com.azure.storage.blob.batch.BlobBatchStorageException;
import com.azure.storage.blob.models.BlobErrorCode;
import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.models.BlobItemProperties;
@@ -99,6 +103,8 @@
public class AzureBlobStore implements BlobStore {
private static final Logger logger = LogManager.getLogger(AzureBlobStore.class);
+ // See https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch#request-body
+ public static final int MAX_ELEMENTS_PER_BATCH = 256;
private static final long DEFAULT_READ_CHUNK_SIZE = new ByteSizeValue(32, ByteSizeUnit.MB).getBytes();
private static final int DEFAULT_UPLOAD_BUFFERS_SIZE = (int) new ByteSizeValue(64, ByteSizeUnit.KB).getBytes();
@@ -110,6 +116,8 @@ public class AzureBlobStore implements BlobStore {
private final String container;
private final LocationMode locationMode;
private final ByteSizeValue maxSinglePartUploadSize;
+ private final int deletionBatchSize;
+ private final int maxConcurrentBatchDeletes;
private final RequestMetricsRecorder requestMetricsRecorder;
private final AzureClientProvider.RequestMetricsHandler requestMetricsHandler;
@@ -129,6 +137,8 @@ public AzureBlobStore(
// locationMode is set per repository, not per client
this.locationMode = Repository.LOCATION_MODE_SETTING.get(metadata.settings());
this.maxSinglePartUploadSize = Repository.MAX_SINGLE_PART_UPLOAD_SIZE_SETTING.get(metadata.settings());
+ this.deletionBatchSize = Repository.DELETION_BATCH_SIZE_SETTING.get(metadata.settings());
+ this.maxConcurrentBatchDeletes = Repository.MAX_CONCURRENT_BATCH_DELETES_SETTING.get(metadata.settings());
List requestMatchers = List.of(
new RequestMatcher((httpMethod, url) -> httpMethod == HttpMethod.HEAD, Operation.GET_BLOB_PROPERTIES),
@@ -147,17 +157,14 @@ public AzureBlobStore(
&& isPutBlockRequest(httpMethod, url) == false
&& isPutBlockListRequest(httpMethod, url) == false,
Operation.PUT_BLOB
- )
+ ),
+ new RequestMatcher(AzureBlobStore::isBlobBatch, Operation.BLOB_BATCH)
);
this.requestMetricsHandler = (purpose, method, url, metrics) -> {
try {
URI uri = url.toURI();
String path = uri.getPath() == null ? "" : uri.getPath();
- // Batch delete requests
- if (path.contains(container) == false) {
- return;
- }
assert path.contains(container) : uri.toString();
} catch (URISyntaxException ignored) {
return;
@@ -172,6 +179,10 @@ && isPutBlockListRequest(httpMethod, url) == false,
};
}
+ private static boolean isBlobBatch(HttpMethod method, URL url) {
+ return method == HttpMethod.POST && url.getQuery() != null && url.getQuery().contains("comp=batch");
+ }
+
private static boolean isListRequest(HttpMethod httpMethod, URL url) {
return httpMethod == HttpMethod.GET && url.getQuery() != null && url.getQuery().contains("comp=list");
}
@@ -231,95 +242,101 @@ public boolean blobExists(OperationPurpose purpose, String blob) throws IOExcept
}
}
- // number of concurrent blob delete requests to use while bulk deleting
- private static final int CONCURRENT_DELETES = 100;
-
- public DeleteResult deleteBlobDirectory(OperationPurpose purpose, String path) {
+ public DeleteResult deleteBlobDirectory(OperationPurpose purpose, String path) throws IOException {
final AtomicInteger blobsDeleted = new AtomicInteger(0);
final AtomicLong bytesDeleted = new AtomicLong(0);
SocketAccess.doPrivilegedVoidException(() -> {
- final BlobContainerAsyncClient blobContainerAsyncClient = asyncClient(purpose).getBlobContainerAsyncClient(container);
+ final AzureBlobServiceClient client = getAzureBlobServiceClientClient(purpose);
+ final BlobContainerAsyncClient blobContainerAsyncClient = client.getAsyncClient().getBlobContainerAsyncClient(container);
final ListBlobsOptions options = new ListBlobsOptions().setPrefix(path)
.setDetails(new BlobListDetails().setRetrieveMetadata(true));
- try {
- blobContainerAsyncClient.listBlobs(options, null).flatMap(blobItem -> {
- if (blobItem.isPrefix() != null && blobItem.isPrefix()) {
- return Mono.empty();
- } else {
- final String blobName = blobItem.getName();
- BlobAsyncClient blobAsyncClient = blobContainerAsyncClient.getBlobAsyncClient(blobName);
- final Mono deleteTask = getDeleteTask(blobName, blobAsyncClient);
- bytesDeleted.addAndGet(blobItem.getProperties().getContentLength());
- blobsDeleted.incrementAndGet();
- return deleteTask;
- }
- }, CONCURRENT_DELETES).then().block();
- } catch (Exception e) {
- filterDeleteExceptionsAndRethrow(e, new IOException("Deleting directory [" + path + "] failed"));
- }
+ final Flux blobsFlux = blobContainerAsyncClient.listBlobs(options).filter(bi -> bi.isPrefix() == false).map(bi -> {
+ bytesDeleted.addAndGet(bi.getProperties().getContentLength());
+ blobsDeleted.incrementAndGet();
+ return bi.getName();
+ });
+ deleteListOfBlobs(client, blobsFlux);
});
return new DeleteResult(blobsDeleted.get(), bytesDeleted.get());
}
- private static void filterDeleteExceptionsAndRethrow(Exception e, IOException exception) throws IOException {
- int suppressedCount = 0;
- for (Throwable suppressed : e.getSuppressed()) {
- // We're only interested about the blob deletion exceptions and not in the reactor internals exceptions
- if (suppressed instanceof IOException) {
- exception.addSuppressed(suppressed);
- suppressedCount++;
- if (suppressedCount > 10) {
- break;
- }
- }
+ @Override
+ public void deleteBlobsIgnoringIfNotExists(OperationPurpose purpose, Iterator blobNames) throws IOException {
+ if (blobNames.hasNext() == false) {
+ return;
+ }
+ SocketAccess.doPrivilegedVoidException(
+ () -> deleteListOfBlobs(
+ getAzureBlobServiceClientClient(purpose),
+ Flux.fromStream(StreamSupport.stream(Spliterators.spliteratorUnknownSize(blobNames, Spliterator.ORDERED), false))
+ )
+ );
+ }
+
+ private void deleteListOfBlobs(AzureBlobServiceClient azureBlobServiceClient, Flux blobNames) throws IOException {
+ // We need to use a container-scoped BlobBatchClient, so the restype=container parameter
+ // is sent, and we can support all SAS token types
+ // See https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=shared-access-signatures#authorization
+ final BlobBatchAsyncClient batchAsyncClient = new BlobBatchClientBuilder(
+ azureBlobServiceClient.getAsyncClient().getBlobContainerAsyncClient(container)
+ ).buildAsyncClient();
+ final List errors;
+ final AtomicInteger errorsCollected = new AtomicInteger(0);
+ try {
+ errors = blobNames.buffer(deletionBatchSize).flatMap(blobs -> {
+ final BlobBatch blobBatch = batchAsyncClient.getBlobBatch();
+ blobs.forEach(blob -> blobBatch.deleteBlob(container, blob));
+ return batchAsyncClient.submitBatch(blobBatch).then(Mono.empty()).onErrorResume(t -> {
+ // Ignore errors that are just 404s, send other errors downstream as values
+ if (AzureBlobStore.isIgnorableBatchDeleteException(t)) {
+ return Mono.empty();
+ } else {
+ // Propagate the first 10 errors only
+ if (errorsCollected.getAndIncrement() < 10) {
+ return Mono.just(t);
+ } else {
+ return Mono.empty();
+ }
+ }
+ });
+ }, maxConcurrentBatchDeletes).collectList().block();
+ } catch (Exception e) {
+ throw new IOException("Error deleting batches", e);
+ }
+ if (errors.isEmpty() == false) {
+ final int totalErrorCount = errorsCollected.get();
+ final String errorMessage = totalErrorCount > errors.size()
+ ? "Some errors occurred deleting batches, the first "
+ + errors.size()
+ + " are included as suppressed, but the total count was "
+ + totalErrorCount
+ : "Some errors occurred deleting batches, all errors included as suppressed";
+ final IOException ex = new IOException(errorMessage);
+ errors.forEach(ex::addSuppressed);
+ throw ex;
}
- throw exception;
}
/**
- * {@inheritDoc}
- *
- * Note that in this Azure implementation we issue a series of individual
- * delete blob calls rather than aggregating
- * deletions into blob batch calls.
- * The reason for this is that the blob batch endpoint has limited support for SAS token authentication.
+ * We can ignore {@link BlobBatchStorageException}s when they are just telling us some of the files were not found
*
- * @see
- * API docs around SAS auth limitations
- * @see Java SDK issue
- * @see Discussion on implementing PR
+ * @param exception An exception throw by batch delete
+ * @return true if it is safe to ignore, false otherwise
*/
- @Override
- public void deleteBlobsIgnoringIfNotExists(OperationPurpose purpose, Iterator blobs) {
- if (blobs.hasNext() == false) {
- return;
- }
-
- BlobServiceAsyncClient asyncClient = asyncClient(purpose);
- SocketAccess.doPrivilegedVoidException(() -> {
- final BlobContainerAsyncClient blobContainerClient = asyncClient.getBlobContainerAsyncClient(container);
- try {
- Flux.fromStream(StreamSupport.stream(Spliterators.spliteratorUnknownSize(blobs, Spliterator.ORDERED), false))
- .flatMap(blob -> getDeleteTask(blob, blobContainerClient.getBlobAsyncClient(blob)), CONCURRENT_DELETES)
- .then()
- .block();
- } catch (Exception e) {
- filterDeleteExceptionsAndRethrow(e, new IOException("Unable to delete blobs"));
+ private static boolean isIgnorableBatchDeleteException(Throwable exception) {
+ if (exception instanceof BlobBatchStorageException bbse) {
+ final Iterable batchExceptions = bbse.getBatchExceptions();
+ for (BlobStorageException bse : batchExceptions) {
+ // If any requests failed with something other than a BLOB_NOT_FOUND, it is not ignorable
+ if (BlobErrorCode.BLOB_NOT_FOUND.equals(bse.getErrorCode()) == false) {
+ return false;
+ }
}
- });
- }
-
- private static Mono getDeleteTask(String blobName, BlobAsyncClient blobAsyncClient) {
- return blobAsyncClient.delete()
- // Ignore not found blobs, as it's possible that due to network errors a request
- // for an already deleted blob is retried, causing an error.
- .onErrorResume(
- e -> e instanceof BlobStorageException blobStorageException && blobStorageException.getStatusCode() == 404,
- throwable -> Mono.empty()
- )
- .onErrorMap(throwable -> new IOException("Error deleting blob " + blobName, throwable));
+ return true;
+ }
+ return false;
}
public InputStream getInputStream(OperationPurpose purpose, String blob, long position, final @Nullable Long length) {
@@ -363,8 +380,7 @@ public Map listBlobsByPrefix(OperationPurpose purpose, Str
for (final BlobItem blobItem : containerClient.listBlobsByHierarchy("/", listBlobsOptions, null)) {
BlobItemProperties properties = blobItem.getProperties();
- Boolean isPrefix = blobItem.isPrefix();
- if (isPrefix != null && isPrefix) {
+ if (blobItem.isPrefix()) {
continue;
}
String blobName = blobItem.getName().substring(keyPath.length());
@@ -689,7 +705,8 @@ enum Operation {
GET_BLOB_PROPERTIES("GetBlobProperties"),
PUT_BLOB("PutBlob"),
PUT_BLOCK("PutBlock"),
- PUT_BLOCK_LIST("PutBlockList");
+ PUT_BLOCK_LIST("PutBlockList"),
+ BLOB_BATCH("BlobBatch");
private final String key;
diff --git a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureClientProvider.java b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureClientProvider.java
index 654742c980268..f92bbcbdd716d 100644
--- a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureClientProvider.java
+++ b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureClientProvider.java
@@ -317,6 +317,11 @@ private enum RetryMetricsTracker implements HttpPipelinePolicy {
@Override
public Mono process(HttpPipelineCallContext context, HttpPipelineNextPolicy next) {
+ if (requestIsPartOfABatch(context)) {
+ // Batch deletes fire once for each of the constituent requests, and they have a null response. Ignore those, we'll track
+ // metrics at the bulk level.
+ return next.process();
+ }
Optional