Skip to content

Commit

Permalink
Experimental batch-blob deletion support WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
nicktindall committed Oct 11, 2024
1 parent 2c1e023 commit 8e08e60
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 32 deletions.
5 changes: 5 additions & 0 deletions gradle/verification-metadata.xml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@
<sha256 value="31915426834400cac854f48441c168d55aa6fc054527f28f1d242a7067affd14" origin="Generated by Gradle"/>
</artifact>
</component>
<component group="com.azure" name="azure-storage-blob-batch" version="12.23.1">
<artifact name="azure-storage-blob-batch-12.23.1.jar">
<sha256 value="8c11749c783222873f63f22575aa5ae7ee8f285388183b82d1a18db21f4d2eba" origin="Generated by Gradle"/>
</artifact>
</component>
<component group="com.azure" name="azure-storage-common" version="12.26.1">
<artifact name="azure-storage-common-12.26.1.jar">
<sha256 value="b0297ac1a9017ccd8a1e5cf41fb8d00ff0adbdd06849f6c5aafb3208708264dd" origin="Generated by Gradle"/>
Expand Down
1 change: 1 addition & 0 deletions modules/repository-azure/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dependencies {
api "com.azure:azure-identity:1.13.2"
api "com.azure:azure-json:1.2.0"
api "com.azure:azure-storage-blob:12.27.1"
api "com.azure:azure-storage-blob-batch:12.23.1"
api "com.azure:azure-storage-common:12.26.1"
api "com.azure:azure-storage-internal-avro:12.12.1"
api "com.azure:azure-xml:1.1.0"
Expand Down
5 changes: 1 addition & 4 deletions modules/repository-azure/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,15 @@
requires org.apache.logging.log4j;
requires org.apache.logging.log4j.core;

requires com.azure.core;
requires com.azure.http.netty;
requires com.azure.storage.blob;
requires com.azure.storage.common;
requires com.azure.identity;

requires io.netty.buffer;
requires io.netty.transport;
requires io.netty.resolver;
requires io.netty.common;

requires reactor.core;
requires reactor.netty.core;
requires reactor.netty.http;
requires com.azure.storage.blob.batch;
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceAsyncClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.batch.BlobBatch;
import com.azure.storage.blob.batch.BlobBatchClient;
import com.azure.storage.blob.batch.BlobBatchClientBuilder;
import com.azure.storage.blob.models.BlobErrorCode;
import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.models.BlobItemProperties;
Expand Down Expand Up @@ -84,21 +87,20 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BiPredicate;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.elasticsearch.core.Strings.format;

public class AzureBlobStore implements BlobStore {
private static final Logger logger = LogManager.getLogger(AzureBlobStore.class);
// See https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch#request-body
private static final int MAX_ELEMENTS_PER_BATCH = 256;
private static final long DEFAULT_READ_CHUNK_SIZE = new ByteSizeValue(32, ByteSizeUnit.MB).getBytes();
private static final int DEFAULT_UPLOAD_BUFFERS_SIZE = (int) new ByteSizeValue(64, ByteSizeUnit.KB).getBytes();

Expand Down Expand Up @@ -278,35 +280,27 @@ private static void filterDeleteExceptionsAndRethrow(Exception e, IOException ex
throw exception;
}

/**
* {@inheritDoc}
* <p>
* Note that in this Azure implementation we issue a series of individual
* <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/delete-blob">delete blob</a> calls rather than aggregating
* deletions into <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch">blob batch</a> calls.
* The reason for this is that the blob batch endpoint has limited support for SAS token authentication.
*
* @see <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=shared-access-signatures#authorization">
* API docs around SAS auth limitations</a>
* @see <a href="https://github.com/Azure/azure-storage-java/issues/538">Java SDK issue</a>
* @see <a href="https://github.com/elastic/elasticsearch/pull/65140#discussion_r528752070">Discussion on implementing PR</a>
*/
@Override
public void deleteBlobsIgnoringIfNotExists(OperationPurpose purpose, Iterator<String> blobs) {
if (blobs.hasNext() == false) {
public void deleteBlobsIgnoringIfNotExists(OperationPurpose purpose, Iterator<String> blobNames) {
if (blobNames.hasNext() == false) {
return;
}

BlobServiceAsyncClient asyncClient = asyncClient(purpose);
final AzureBlobServiceClient azureBlobServiceClient = getAzureBlobServiceClientClient(purpose);
SocketAccess.doPrivilegedVoidException(() -> {
final BlobContainerAsyncClient blobContainerClient = asyncClient.getBlobContainerAsyncClient(container);
try {
Flux.fromStream(StreamSupport.stream(Spliterators.spliteratorUnknownSize(blobs, Spliterator.ORDERED), false))
.flatMap(blob -> getDeleteTask(blob, blobContainerClient.getBlobAsyncClient(blob)), CONCURRENT_DELETES)
.then()
.block();
} catch (Exception e) {
filterDeleteExceptionsAndRethrow(e, new IOException("Unable to delete blobs"));
// We need to use a container-scoped BlobBatchClient, so the restype=container parameter
// is sent, and we can support all SAS token types
// See https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=shared-access-signatures#authorization
BlobBatchClient batchAsyncClient = new BlobBatchClientBuilder(
azureBlobServiceClient.getAsyncClient().getBlobContainerAsyncClient(container)
).buildClient();
while (blobNames.hasNext()) {
final BlobBatch currentBatch = batchAsyncClient.getBlobBatch();
int counter = 0;
while (counter < MAX_ELEMENTS_PER_BATCH && blobNames.hasNext()) {
currentBatch.deleteBlob(container, blobNames.next());
counter++;
}
batchAsyncClient.submitBatch(currentBatch);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Streams;
Expand All @@ -23,9 +25,11 @@
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentType;

import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -35,6 +39,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import java.util.regex.Matcher;
Expand All @@ -47,6 +52,8 @@
*/
@SuppressForbidden(reason = "Uses a HttpServer to emulate an Azure endpoint")
public class AzureHttpHandler implements HttpHandler {
private static final Logger logger = LogManager.getLogger(AzureHttpHandler.class);

private final Map<String, BytesReference> blobs;
private final String account;
private final String container;
Expand Down Expand Up @@ -264,7 +271,89 @@ public void handle(final HttpExchange exchange) throws IOException {
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
exchange.getResponseBody().write(response);

} else if (Regex.simpleMatch("POST /" + account + "/" + container + "*restype=container*comp=batch*", request)) {
// Blob Batch (https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch)
final StringBuilder response = new StringBuilder();

try (BufferedReader requestReader = new BufferedReader(new InputStreamReader(exchange.getRequestBody()))) {
String batchBoundary = requestReader.readLine();
String responseBoundary = "batch_" + UUID.randomUUID();

logger.debug("Batch boundary: " + batchBoundary);
String line;
String contentId = null, requestId = null, toDelete = null;
while ((line = requestReader.readLine()) != null) {
if (batchBoundary.equals(line) || (batchBoundary + "--").equals(line)) {
// Found the end of a single request, process it
if (contentId == null || requestId == null || toDelete == null) {
throw new IllegalStateException(
"Missing contentId/requestId/toDelete: " + contentId + "/" + requestId + "/" + toDelete
);
}

// Process the deletion
blobs.remove("/" + account + toDelete);
response.append("--")
.append(responseBoundary)
.append("\r\n")
.append("Content-Type: application/http\r\n")
.append("Content-ID: ")
.append(contentId)
.append("\r\n\r\n")
.append("HTTP/1.1 202 Accepted\r\n")
.append("x-ms-delete-type-permanent: true\r\n")
.append("x-ms-request-id: ")
.append(requestId)
.append("\r\n")
.append("x-ms-version: 2018-11-09\r\n\r\n");

// Clear the state
toDelete = null;
contentId = null;
requestId = null;

logger.debug("--> Starting a new batch");
} else if (line.startsWith("Content-Type")
|| line.startsWith("Content-Transfer-Encoding")
|| line.startsWith("Accept")
|| line.startsWith("Content-Length")
|| line.startsWith("User-Agent")
|| line.startsWith("Date")
|| line.isBlank()) {
// Ignore
} else if (Regex.simpleMatch("x-ms-client-request-id: *", line)) {
if (requestId != null) {
throw new IllegalStateException("Got multiple request IDs in a single request?");
}
requestId = line.split("\\s")[1];
} else if (Regex.simpleMatch("Content-ID: *", line)) {
if (contentId != null) {
throw new IllegalStateException("Got multiple content IDs in a single request?");
}
contentId = line.split("\\s")[1];
} else if (Regex.simpleMatch("DELETE /" + container + "/*", line)) {
logger.debug("--> Got delete line: " + line);
String blobName = RestUtils.decodeComponent(line.split("(\\s|\\?)")[1]);
logger.debug("--> Deleting blob: " + blobName);
if (toDelete != null) {
throw new IllegalStateException("Got multiple deletes in a single request?");
}
toDelete = blobName;
} else {
logger.debug("--> Ignoring line: " + line);
}
}
response.append("--").append(responseBoundary).append("--\r\n0\r\n");
// Send the response
exchange.getResponseHeaders().add("Content-Type", "multipart/mixed; boundary=" + responseBoundary);
exchange.sendResponseHeaders(RestStatus.ACCEPTED.getStatus(), response.length());
logger.debug("--> Sending response:\n{}", response);
try (OutputStream responseBody = exchange.getResponseBody()) {
responseBody.write(response.toString().getBytes(StandardCharsets.UTF_8));
}
}
} else {
logger.warn("--> Unrecognised request received: {}", request);
sendError(exchange, RestStatus.BAD_REQUEST);
}
} finally {
Expand Down

0 comments on commit 8e08e60

Please sign in to comment.