Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Azure blob batch API to delete blobs in batches #114566

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
8e08e60
Experimental batch-blob deletion support WIP
nicktindall Oct 11, 2024
8f67b0a
Handle null responses on success?! (temporary workaround)
nicktindall Oct 11, 2024
86a59e9
Check error responses, track metrics
nicktindall Oct 14, 2024
22cfdb8
Skip metrics on batched requests more explicitly
nicktindall Oct 14, 2024
83ef063
Test that batch delete is tracked
nicktindall Oct 14, 2024
798b83b
Undo broken request time tracking
nicktindall Oct 14, 2024
57286c4
Merge remote-tracking branch 'origin/main' into ES-9777_support_batch…
nicktindall Oct 14, 2024
219cf40
Use batch delete when deleting blob directory
nicktindall Oct 14, 2024
d0fc6ee
Fix naming
nicktindall Oct 14, 2024
c566966
Tidy up response templates
nicktindall Oct 14, 2024
d5aa10f
Remove most debug logging from AzureHttpHandler
nicktindall Oct 14, 2024
87b08e6
Submit all batch deletes concurrently
nicktindall Oct 14, 2024
b034c50
Don't swallow exception
nicktindall Oct 14, 2024
ebc47e8
Track "blob batch" instead of "batch delete" (we can't be specific wi…
nicktindall Oct 14, 2024
e3cb397
Fix date/time format
nicktindall Oct 14, 2024
5df3384
Add test for batch delete failure metrics/behaviour
nicktindall Oct 14, 2024
caf1ab7
Merge remote-tracking branch 'origin/main' into ES-9777_support_batch…
nicktindall Oct 14, 2024
05e01c1
Remove redundant code from metrics handler
nicktindall Oct 15, 2024
e744cbd
Make max deletes per batch configurable
nicktindall Oct 15, 2024
6115885
Improve handling of individual batch failures
nicktindall Oct 16, 2024
b08d6af
Make batches execute concurrently
nicktindall Oct 16, 2024
06a3b5d
Merge remote-tracking branch 'origin/main' into ES-9777_support_batch…
nicktindall Oct 16, 2024
84211f9
Remove redundant null check
nicktindall Oct 16, 2024
aa7ecfb
Don't close response body
nicktindall Oct 16, 2024
1b12a63
Log to indicate which authentication method we're using
nicktindall Oct 16, 2024
9a3718a
Assert that the non-failed deletes succeeded
nicktindall Oct 16, 2024
2f37338
Write our own subscriber to the delete tasks to more accurately captu…
nicktindall Oct 16, 2024
dda6e8a
Merge remote-tracking branch 'origin/main' into ES-9777_support_batch…
nicktindall Oct 16, 2024
d546ec5
Use reactor to list and delete
nicktindall Oct 22, 2024
8588247
Merge remote-tracking branch 'origin/main' into ES-9777_support_batch…
nicktindall Oct 22, 2024
4641580
Add docs for max_concurrent_batch_deletes
nicktindall Oct 22, 2024
6b94158
Tidy up
nicktindall Oct 22, 2024
8efb2e1
Remove SocketAccess#doPrivilegedVoidExceptionExplicit
nicktindall Oct 23, 2024
09f27b8
Naming
nicktindall Oct 23, 2024
b913fd0
Limit amount of suppressed errors
nicktindall Oct 23, 2024
ac772ec
Randomise max_concurrent_batch_deletes
nicktindall Oct 23, 2024
dfe0d9c
Comment wording
nicktindall Oct 23, 2024
6818ec7
Tidy
nicktindall Oct 23, 2024
eab8700
Update docs/changelog/114566.yaml
nicktindall Oct 23, 2024
93bfe04
Update docs/reference/snapshot-restore/repository-azure.asciidoc
nicktindall Oct 24, 2024
d9ce4b5
Add info log prefix pattern
nicktindall Oct 24, 2024
84ff3c2
Indicate the total count of errors when it exceeds our limit for incl…
nicktindall Oct 24, 2024
103aec4
Catch Exception instead of RuntimeException
nicktindall Oct 24, 2024
6c4697e
Set maximum for max_concurrent_batch_deletes
nicktindall Oct 24, 2024
beae254
Merge remote-tracking branch 'origin/main' into ES-9777_support_batch…
nicktindall Oct 24, 2024
5f1143e
Fix batch error handling
nicktindall Oct 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/114566.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 114566
summary: Use Azure blob batch API to delete blobs in batches
area: Distributed
type: enhancement
issues: []
9 changes: 9 additions & 0 deletions docs/reference/snapshot-restore/repository-azure.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,15 @@ include::repository-shared-settings.asciidoc[]
`primary_only` or `secondary_only`. Defaults to `primary_only`. Note that if you set it
to `secondary_only`, it will force `readonly` to true.

`delete_objects_max_size`::

(integer) Sets the maxmimum batch size, betewen 1 and 256, used for `BlobBatch` requests. Defaults to 256 which is the maximum
number supported by the https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch#remarks[Azure blob batch API].

`max_concurrent_batch_deletes`::

(integer) Sets the maximum number of concurrent batch delete requests that will be submitted for any individual bulk delete with `BlobBatch`. Note that the effective number of concurrent deletes is further limited by the Azure client connection and event loop thread limits. Defaults to 10, minimum is 1, maximum is 100.

[[repository-azure-validation]]
==== Repository validation rules

Expand Down
5 changes: 5 additions & 0 deletions gradle/verification-metadata.xml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@
<sha256 value="31915426834400cac854f48441c168d55aa6fc054527f28f1d242a7067affd14" origin="Generated by Gradle"/>
</artifact>
</component>
<component group="com.azure" name="azure-storage-blob-batch" version="12.23.1">
<artifact name="azure-storage-blob-batch-12.23.1.jar">
<sha256 value="8c11749c783222873f63f22575aa5ae7ee8f285388183b82d1a18db21f4d2eba" origin="Generated by Gradle"/>
</artifact>
</component>
<component group="com.azure" name="azure-storage-common" version="12.26.1">
<artifact name="azure-storage-common-12.26.1.jar">
<sha256 value="b0297ac1a9017ccd8a1e5cf41fb8d00ff0adbdd06849f6c5aafb3208708264dd" origin="Generated by Gradle"/>
Expand Down
1 change: 1 addition & 0 deletions modules/repository-azure/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dependencies {
api "com.azure:azure-identity:1.13.2"
api "com.azure:azure-json:1.2.0"
api "com.azure:azure-storage-blob:12.27.1"
api "com.azure:azure-storage-blob-batch:12.23.1"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the version consistent with the others from the BOM

api "com.azure:azure-storage-common:12.26.1"
api "com.azure:azure-storage-internal-avro:12.12.1"
api "com.azure:azure-xml:1.1.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,18 @@

package org.elasticsearch.repositories.azure;

import com.sun.net.httpserver.Headers;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;

import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.OperationPurpose;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.repositories.RepositoriesMetrics;
Expand All @@ -31,6 +35,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
Expand All @@ -43,6 +48,7 @@
import java.util.stream.IntStream;

import static org.elasticsearch.repositories.azure.AbstractAzureServerTestCase.randomBlobContent;
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
Expand Down Expand Up @@ -225,6 +231,91 @@ public void testRequestTimeIsAccurate() throws IOException {
assertThat(recordedRequestTime, lessThanOrEqualTo(elapsedTimeMillis));
}

public void testBatchDeleteFailure() throws IOException {
final int deleteBatchSize = randomIntBetween(1, 30);
final String repositoryName = randomRepositoryName();
final String repository = createRepository(
repositoryName,
Settings.builder()
.put(repositorySettings(repositoryName))
.put(AzureRepository.Repository.DELETION_BATCH_SIZE_SETTING.getKey(), deleteBatchSize)
.build(),
true
);
final String dataNodeName = internalCluster().getNodeNameThat(DiscoveryNode::canContainData);
final BlobContainer container = getBlobContainer(dataNodeName, repository);

final List<String> blobsToDelete = new ArrayList<>();
final int numberOfBatches = randomIntBetween(3, 20);
final int numberOfBlobs = numberOfBatches * deleteBatchSize;
final int failedBatches = randomIntBetween(1, numberOfBatches);
for (int i = 0; i < numberOfBlobs; i++) {
byte[] bytes = randomBytes(randomInt(100));
String blobName = "index-" + randomAlphaOfLength(10);
container.writeBlob(randomPurpose(), blobName, new BytesArray(bytes), false);
blobsToDelete.add(blobName);
}
Randomness.shuffle(blobsToDelete);
clearMetrics(dataNodeName);

// Handler will fail one or more of the batch requests
final RequestHandler failNRequestRequestHandler = createFailNRequestsHandler(failedBatches);

// Exhaust the retries
IntStream.range(0, (numberOfBatches - failedBatches) + (failedBatches * (MAX_RETRIES + 1)))
.forEach(i -> requestHandlers.offer(failNRequestRequestHandler));

logger.info("--> Failing {} of {} batches", failedBatches, numberOfBatches);

final IOException exception = assertThrows(
IOException.class,
() -> container.deleteBlobsIgnoringIfNotExists(randomPurpose(), blobsToDelete.iterator())
);
assertEquals(Math.min(failedBatches, 10), exception.getSuppressed().length);
assertEquals(
(numberOfBatches - failedBatches) + (failedBatches * (MAX_RETRIES + 1L)),
getLongCounterTotal(dataNodeName, RepositoriesMetrics.METRIC_REQUESTS_TOTAL)
);
assertEquals((failedBatches * (MAX_RETRIES + 1L)), getLongCounterTotal(dataNodeName, RepositoriesMetrics.METRIC_EXCEPTIONS_TOTAL));
assertEquals(failedBatches * deleteBatchSize, container.listBlobs(randomPurpose()).size());
}

private long getLongCounterTotal(String dataNodeName, String metricKey) {
return getTelemetryPlugin(dataNodeName).getLongCounterMeasurement(metricKey)
.stream()
.mapToLong(Measurement::getLong)
.reduce(0L, Long::sum);
}

/**
* Creates a {@link RequestHandler} that will persistently fail the first <code>numberToFail</code> distinct requests
* it sees. Any other requests are passed through to the delegate.
*
* @param numberToFail The number of requests to fail
* @return the handler
*/
private static RequestHandler createFailNRequestsHandler(int numberToFail) {
final List<String> requestsToFail = new ArrayList<>(numberToFail);
return (exchange, delegate) -> {
final Headers requestHeaders = exchange.getRequestHeaders();
final String requestId = requestHeaders.get("X-ms-client-request-id").get(0);
boolean failRequest = false;
synchronized (requestsToFail) {
if (requestsToFail.contains(requestId)) {
failRequest = true;
} else if (requestsToFail.size() < numberToFail) {
requestsToFail.add(requestId);
failRequest = true;
}
}
if (failRequest) {
exchange.sendResponseHeaders(500, -1);
} else {
delegate.handle(exchange);
}
};
}

private void clearMetrics(String discoveryNode) {
internalCluster().getInstance(PluginsService.class, discoveryNode)
.filterPlugins(TestTelemetryPlugin.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ protected Settings repositorySettings(String repoName) {
.put(super.repositorySettings(repoName))
.put(AzureRepository.Repository.MAX_SINGLE_PART_UPLOAD_SIZE_SETTING.getKey(), new ByteSizeValue(1, ByteSizeUnit.MB))
.put(AzureRepository.Repository.CONTAINER_SETTING.getKey(), "container")
.put(AzureStorageSettings.ACCOUNT_SETTING.getKey(), "test");
.put(AzureStorageSettings.ACCOUNT_SETTING.getKey(), "test")
.put(AzureRepository.Repository.DELETION_BATCH_SIZE_SETTING.getKey(), randomIntBetween(5, 256))
.put(AzureRepository.Repository.MAX_CONCURRENT_BATCH_DELETES_SETTING.getKey(), randomIntBetween(1, 10));
if (randomBoolean()) {
settingsBuilder.put(AzureRepository.Repository.BASE_PATH_SETTING.getKey(), randomFrom("test", "test/1"));
}
Expand Down Expand Up @@ -249,6 +251,8 @@ protected void maybeTrack(String request, Headers headers) {
trackRequest("PutBlockList");
} else if (Regex.simpleMatch("PUT /*/*", request)) {
trackRequest("PutBlob");
} else if (Regex.simpleMatch("POST /*/*?*comp=batch*", request)) {
trackRequest("BlobBatch");
}
}

Expand Down Expand Up @@ -279,10 +283,22 @@ public void testLargeBlobCountDeletion() throws Exception {
}

public void testDeleteBlobsIgnoringIfNotExists() throws Exception {
try (BlobStore store = newBlobStore()) {
// Test with a smaller batch size here
final int deleteBatchSize = randomIntBetween(1, 30);
final String repositoryName = randomRepositoryName();
createRepository(
repositoryName,
Settings.builder()
.put(repositorySettings(repositoryName))
.put(AzureRepository.Repository.DELETION_BATCH_SIZE_SETTING.getKey(), deleteBatchSize)
.build(),
true
);
try (BlobStore store = newBlobStore(repositoryName)) {
final BlobContainer container = store.blobContainer(BlobPath.EMPTY);
List<String> blobsToDelete = new ArrayList<>();
for (int i = 0; i < 10; i++) {
final int toDeleteCount = randomIntBetween(deleteBatchSize, 3 * deleteBatchSize);
final List<String> blobsToDelete = new ArrayList<>();
for (int i = 0; i < toDeleteCount; i++) {
byte[] bytes = randomBytes(randomInt(100));
String blobName = randomAlphaOfLength(10);
container.writeBlob(randomPurpose(), blobName, new BytesArray(bytes), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.Booleans;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.AbstractThirdPartyRepositoryTestCase;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
Expand All @@ -46,6 +48,7 @@
import static org.hamcrest.Matchers.not;

public class AzureStorageCleanupThirdPartyTests extends AbstractThirdPartyRepositoryTestCase {
private static final Logger logger = LogManager.getLogger(AzureStorageCleanupThirdPartyTests.class);
private static final boolean USE_FIXTURE = Booleans.parseBoolean(System.getProperty("test.azure.fixture", "true"));

private static final String AZURE_ACCOUNT = System.getProperty("test.azure.account");
Expand Down Expand Up @@ -89,8 +92,10 @@ protected SecureSettings credentials() {
MockSecureSettings secureSettings = new MockSecureSettings();
secureSettings.setString("azure.client.default.account", System.getProperty("test.azure.account"));
if (hasSasToken) {
logger.info("--> Using SAS token authentication");
secureSettings.setString("azure.client.default.sas_token", System.getProperty("test.azure.sas_token"));
} else {
logger.info("--> Using key authentication");
secureSettings.setString("azure.client.default.key", System.getProperty("test.azure.key"));
}
return secureSettings;
Expand Down
5 changes: 1 addition & 4 deletions modules/repository-azure/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,15 @@
requires org.apache.logging.log4j;
requires org.apache.logging.log4j.core;

requires com.azure.core;
requires com.azure.http.netty;
requires com.azure.storage.blob;
requires com.azure.storage.common;
requires com.azure.identity;

requires io.netty.buffer;
requires io.netty.transport;
requires io.netty.resolver;
requires io.netty.common;

requires reactor.core;
requires reactor.netty.core;
requires reactor.netty.http;
requires com.azure.storage.blob.batch;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IntelliJ seemed to optimize the requires, the ones removed above are all transitively required by com.azure.storage.blob.batch.

}
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void writeMetadataBlob(
}

@Override
public DeleteResult delete(OperationPurpose purpose) {
public DeleteResult delete(OperationPurpose purpose) throws IOException {
return blobStore.deleteBlobDirectory(purpose, keyPath);
}

Expand Down
Loading