From b51ea25a00b355ee27a6b1c37d68377a9f201e4c Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 23 Mar 2020 13:35:05 +0100 Subject: [PATCH] Use Azure Bulk Deletes in Azure Repository (#53919) (#53967) Now that we upgraded the Azure SDK to 8.6.2 in #53865 we can make use of bulk deletes. --- .../qa/microsoft-azure-storage/build.gradle | 2 - .../azure/AzureBlobContainer.java | 41 +------- .../repositories/azure/AzureBlobStore.java | 20 ++-- .../repositories/azure/AzureRepository.java | 2 +- .../azure/AzureRepositoryPlugin.java | 15 --- .../azure/AzureStorageService.java | 97 +++++++++---------- .../azure/AzureBlobContainerRetriesTests.java | 8 +- .../azure/AzureBlobStoreRepositoryTests.java | 2 +- .../java/fixture/azure/AzureHttpFixture.java | 2 +- .../java/fixture/azure/AzureHttpHandler.java | 55 ++++++++++- 10 files changed, 112 insertions(+), 132 deletions(-) diff --git a/plugins/repository-azure/qa/microsoft-azure-storage/build.gradle b/plugins/repository-azure/qa/microsoft-azure-storage/build.gradle index b594ff3b172f2..3e5cd7758d22c 100644 --- a/plugins/repository-azure/qa/microsoft-azure-storage/build.gradle +++ b/plugins/repository-azure/qa/microsoft-azure-storage/build.gradle @@ -88,7 +88,5 @@ testClusters.integTest { // in a hacky way to change the protocol and endpoint. We must fix that. setting 'azure.client.integration_test.endpoint_suffix', { "ignored;DefaultEndpointsProtocol=http;BlobEndpoint=${-> azureAddress()}" }, IGNORE_VALUE - String firstPartOfSeed = BuildParams.testSeed.tokenize(':').get(0) - setting 'thread_pool.repository_azure.max', (Math.abs(Long.parseUnsignedLong(firstPartOfSeed, 16) % 10) + 1).toString(), System.getProperty('ignore.tests.seed') == null ? DEFAULT : IGNORE_VALUE } } diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java index 2093139e115a3..6bd480f7923a9 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java @@ -23,17 +23,12 @@ import com.microsoft.azure.storage.StorageException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRunnable; -import org.elasticsearch.action.support.GroupedActionListener; -import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; -import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.io.InputStream; @@ -42,20 +37,18 @@ import java.nio.file.NoSuchFileException; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; public class AzureBlobContainer extends AbstractBlobContainer { private final Logger logger = LogManager.getLogger(AzureBlobContainer.class); private final AzureBlobStore blobStore; - private final ThreadPool threadPool; private final String keyPath; - AzureBlobContainer(BlobPath path, AzureBlobStore blobStore, ThreadPool threadPool) { + AzureBlobContainer(BlobPath path, AzureBlobStore blobStore) { super(path); this.blobStore = blobStore; this.keyPath = path.buildAsString(); - this.threadPool = threadPool; } private boolean blobExists(String blobName) { @@ -112,7 +105,7 @@ public void writeBlobAtomic(String blobName, InputStream inputStream, long blobS @Override public DeleteResult delete() throws IOException { try { - return blobStore.deleteBlobDirectory(keyPath, threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME)); + return blobStore.deleteBlobDirectory(keyPath); } catch (URISyntaxException | StorageException e) { throw new IOException(e); } @@ -120,33 +113,9 @@ public DeleteResult delete() throws IOException { @Override public void deleteBlobsIgnoringIfNotExists(List blobNames) throws IOException { - final PlainActionFuture result = PlainActionFuture.newFuture(); - if (blobNames.isEmpty()) { - result.onResponse(null); - } else { - final GroupedActionListener listener = - new GroupedActionListener<>(ActionListener.map(result, v -> null), blobNames.size()); - final ExecutorService executor = threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME); - // Executing deletes in parallel since Azure SDK 8 is using blocking IO while Azure does not provide a bulk delete API endpoint - // TODO: Upgrade to newer non-blocking Azure SDK 11 and execute delete requests in parallel that way. - for (String blobName : blobNames) { - executor.execute(ActionRunnable.run(listener, () -> { - logger.trace("deleteBlob({})", blobName); - try { - blobStore.deleteBlob(buildKey(blobName)); - } catch (StorageException e) { - if (e.getHttpStatusCode() != HttpURLConnection.HTTP_NOT_FOUND) { - throw new IOException(e); - } - } catch (URISyntaxException e) { - throw new IOException(e); - } - })); - } - } try { - result.actionGet(); - } catch (Exception e) { + blobStore.deleteBlobsIgnoringIfNotExists(blobNames.stream().map(this::buildKey).collect(Collectors.toList())); + } catch (URISyntaxException | StorageException e) { throw new IOException("Exception during bulk delete", e); } } diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java index 714e29edea29d..173f13d801ff3 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java @@ -28,14 +28,13 @@ import org.elasticsearch.common.blobstore.BlobStore; import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.repositories.azure.AzureRepository.Repository; -import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.io.InputStream; import java.net.URISyntaxException; +import java.util.Collection; import java.util.Collections; import java.util.Map; -import java.util.concurrent.Executor; import java.util.function.Function; import java.util.stream.Collectors; @@ -44,17 +43,15 @@ public class AzureBlobStore implements BlobStore { private final AzureStorageService service; - private final ThreadPool threadPool; private final String clientName; private final String container; private final LocationMode locationMode; - public AzureBlobStore(RepositoryMetaData metadata, AzureStorageService service, ThreadPool threadPool) { + public AzureBlobStore(RepositoryMetaData metadata, AzureStorageService service) { this.container = Repository.CONTAINER_SETTING.get(metadata.settings()); this.clientName = Repository.CLIENT_NAME.get(metadata.settings()); this.service = service; - this.threadPool = threadPool; // locationMode is set per repository, not per client this.locationMode = Repository.LOCATION_MODE_SETTING.get(metadata.settings()); final Map prevSettings = this.service.refreshAndClearCache(emptyMap()); @@ -80,7 +77,7 @@ public LocationMode getLocationMode() { @Override public BlobContainer blobContainer(BlobPath path) { - return new AzureBlobContainer(path, this, threadPool); + return new AzureBlobContainer(path, this); } @Override @@ -91,13 +88,12 @@ public boolean blobExists(String blob) throws URISyntaxException, StorageExcepti return service.blobExists(clientName, container, blob); } - public void deleteBlob(String blob) throws URISyntaxException, StorageException, IOException { - service.deleteBlob(clientName, container, blob); + public void deleteBlobsIgnoringIfNotExists(Collection blobs) throws URISyntaxException, StorageException { + service.deleteBlobsIgnoringIfNotExists(clientName, container, blobs); } - public DeleteResult deleteBlobDirectory(String path, Executor executor) - throws URISyntaxException, StorageException, IOException { - return service.deleteBlobDirectory(clientName, container, path, executor); + public DeleteResult deleteBlobDirectory(String path) throws URISyntaxException, StorageException, IOException { + return service.deleteBlobDirectory(clientName, container, path); } public InputStream getInputStream(String blob) throws URISyntaxException, StorageException, IOException { @@ -111,7 +107,7 @@ public Map listBlobsByPrefix(String keyPath, String prefix public Map children(BlobPath path) throws URISyntaxException, StorageException, IOException { return Collections.unmodifiableMap(service.children(clientName, container, path).stream().collect( - Collectors.toMap(Function.identity(), name -> new AzureBlobContainer(path.add(name), this, threadPool)))); + Collectors.toMap(Function.identity(), name -> new AzureBlobContainer(path.add(name), this)))); } public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java index e07ffedb444dd..c39f951db56a7 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepository.java @@ -115,7 +115,7 @@ protected BlobStore getBlobStore() { @Override protected AzureBlobStore createBlobStore() { - final AzureBlobStore blobStore = new AzureBlobStore(metadata, storageService, threadPool); + final AzureBlobStore blobStore = new AzureBlobStore(metadata, storageService); logger.debug(() -> new ParameterizedMessage( "using container [{}], chunk_size [{}], compress [{}], base_path [{}]", diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java index d98a7c3cbd717..ae1258a73b4f5 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java @@ -23,16 +23,12 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsException; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.ReloadablePlugin; import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.repositories.Repository; -import org.elasticsearch.threadpool.ExecutorBuilder; -import org.elasticsearch.threadpool.ScalingExecutorBuilder; - import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -43,8 +39,6 @@ */ public class AzureRepositoryPlugin extends Plugin implements RepositoryPlugin, ReloadablePlugin { - public static final String REPOSITORY_THREAD_POOL_NAME = "repository_azure"; - // protected for testing final AzureStorageService azureStoreService; @@ -80,15 +74,6 @@ public List> getSettings() { ); } - @Override - public List> getExecutorBuilders(Settings settings) { - return Collections.singletonList(executorBuilder()); - } - - public static ExecutorBuilder executorBuilder() { - return new ScalingExecutorBuilder(REPOSITORY_THREAD_POOL_NAME, 0, 32, TimeValue.timeValueSeconds(30L)); - } - @Override public void reload(Settings settings) { // secure settings should be readable diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java index ca7cb3475b445..a7ddcbf48e197 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java @@ -20,13 +20,16 @@ package org.elasticsearch.repositories.azure; import com.microsoft.azure.storage.AccessCondition; +import com.microsoft.azure.storage.BatchException; import com.microsoft.azure.storage.CloudStorageAccount; +import com.microsoft.azure.storage.Constants; import com.microsoft.azure.storage.OperationContext; import com.microsoft.azure.storage.RetryExponentialRetry; import com.microsoft.azure.storage.RetryPolicy; import com.microsoft.azure.storage.RetryPolicyFactory; import com.microsoft.azure.storage.StorageErrorCodeStrings; import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.BlobDeleteBatchOperation; import com.microsoft.azure.storage.blob.BlobInputStream; import com.microsoft.azure.storage.blob.BlobListingDetails; import com.microsoft.azure.storage.blob.BlobProperties; @@ -42,7 +45,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.DeleteResult; @@ -53,7 +55,6 @@ import org.elasticsearch.common.settings.SettingsException; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.util.concurrent.AbstractRunnable; import java.io.IOException; import java.io.InputStream; @@ -67,9 +68,10 @@ import java.util.Collections; import java.util.EnumSet; import java.util.HashSet; +import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; @@ -188,72 +190,61 @@ public boolean blobExists(String account, String container, String blob) throws }); } - public void deleteBlob(String account, String container, String blob) throws URISyntaxException, StorageException { + public void deleteBlobsIgnoringIfNotExists(String account, String container, Collection blobs) + throws URISyntaxException, StorageException { + logger.trace(() -> new ParameterizedMessage("delete blobs for container [{}], blob [{}]", container, blobs)); final Tuple> client = client(account); // Container name must be lower case. final CloudBlobContainer blobContainer = client.v1().getContainerReference(container); - logger.trace(() -> new ParameterizedMessage("delete blob for container [{}], blob [{}]", container, blob)); - SocketAccess.doPrivilegedVoidException(() -> { - final CloudBlockBlob azureBlob = blobContainer.getBlockBlobReference(blob); - logger.trace(() -> new ParameterizedMessage("container [{}]: blob [{}] found. removing.", container, blob)); - azureBlob.delete(DeleteSnapshotsOption.NONE, null, null, client.v2().get()); - }); + final Iterator blobIterator = blobs.iterator(); + int currentBatchSize = 0; + while (blobIterator.hasNext()) { + final BlobDeleteBatchOperation batchDeleteOp = new BlobDeleteBatchOperation(); + do { + batchDeleteOp.addSubOperation(blobContainer.getBlockBlobReference(blobIterator.next()), + DeleteSnapshotsOption.NONE, null, null); + ++currentBatchSize; + } while (blobIterator.hasNext() && currentBatchSize < Constants.BATCH_MAX_REQUESTS); + currentBatchSize = 0; + try { + SocketAccess.doPrivilegedVoidException(() -> blobContainer.getServiceClient().executeBatch(batchDeleteOp)); + } catch (BatchException e) { + for (StorageException ex : e.getExceptions().values()) { + if (ex.getHttpStatusCode() != HttpURLConnection.HTTP_NOT_FOUND) { + logger.error("Batch exceptions [{}]", e.getExceptions()); + throw e; + } + } + } + } } - DeleteResult deleteBlobDirectory(String account, String container, String path, Executor executor) + DeleteResult deleteBlobDirectory(String account, String container, String path) throws URISyntaxException, StorageException, IOException { final Tuple> client = client(account); final CloudBlobContainer blobContainer = client.v1().getContainerReference(container); - final Collection exceptions = Collections.synchronizedList(new ArrayList<>()); - final AtomicLong outstanding = new AtomicLong(1L); - final PlainActionFuture result = PlainActionFuture.newFuture(); final AtomicLong blobsDeleted = new AtomicLong(); final AtomicLong bytesDeleted = new AtomicLong(); + final List blobsToDelete = new ArrayList<>(); SocketAccess.doPrivilegedVoidException(() -> { - for (final ListBlobItem blobItem : blobContainer.listBlobs(path, true)) { + for (ListBlobItem blobItem : blobContainer.listBlobs(path, true)) { // uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/ // this requires 1 + container.length() + 1, with each 1 corresponding to one of the / final String blobPath = blobItem.getUri().getPath().substring(1 + container.length() + 1); - outstanding.incrementAndGet(); - executor.execute(new AbstractRunnable() { - @Override - protected void doRun() throws Exception { - final long len; - if (blobItem instanceof CloudBlob) { - len = ((CloudBlob) blobItem).getProperties().getLength(); - } else { - len = -1L; - } - deleteBlob(account, container, blobPath); - blobsDeleted.incrementAndGet(); - if (len >= 0) { - bytesDeleted.addAndGet(len); - } - } - - @Override - public void onFailure(Exception e) { - exceptions.add(e); - } - - @Override - public void onAfter() { - if (outstanding.decrementAndGet() == 0) { - result.onResponse(null); - } - } - }); + final long len; + if (blobItem instanceof CloudBlob) { + len = ((CloudBlob) blobItem).getProperties().getLength(); + } else { + len = -1L; + } + blobsToDelete.add(blobPath); + blobsDeleted.incrementAndGet(); + if (len >= 0) { + bytesDeleted.addAndGet(len); + } } }); - if (outstanding.decrementAndGet() == 0) { - result.onResponse(null); - } - result.actionGet(); - if (exceptions.isEmpty() == false) { - final IOException ex = new IOException("Deleting directory [" + path + "] failed"); - exceptions.forEach(ex::addSuppressed); - throw ex; - } + deleteBlobsIgnoringIfNotExists(account, container, blobsToDelete); return new DeleteResult(blobsDeleted.get(), bytesDeleted.get()); } diff --git a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java index ce3cba065c35b..b8ec6115b4d17 100644 --- a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java +++ b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java @@ -44,8 +44,6 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestUtils; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.TestThreadPool; -import org.elasticsearch.threadpool.ThreadPool; import org.junit.After; import org.junit.Before; @@ -64,7 +62,6 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Matcher; @@ -91,11 +88,9 @@ public class AzureBlobContainerRetriesTests extends ESTestCase { private HttpServer httpServer; - private ThreadPool threadPool; @Before public void setUp() throws Exception { - threadPool = new TestThreadPool(getTestClass().getName(), AzureRepositoryPlugin.executorBuilder()); httpServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0); httpServer.start(); super.setUp(); @@ -105,7 +100,6 @@ public void setUp() throws Exception { public void tearDown() throws Exception { httpServer.stop(0); super.tearDown(); - ThreadPool.terminate(threadPool, 10L, TimeUnit.SECONDS); } private BlobContainer createBlobContainer(final int maxRetries) { @@ -145,7 +139,7 @@ BlobRequestOptions getBlobRequestOptionsForWriteBlob() { .put(ACCOUNT_SETTING.getKey(), clientName) .build()); - return new AzureBlobContainer(BlobPath.cleanPath(), new AzureBlobStore(repositoryMetaData, service, threadPool), threadPool); + return new AzureBlobContainer(BlobPath.cleanPath(), new AzureBlobStore(repositoryMetaData, service)); } public void testReadNonexistentBlobThrowsNoSuchFileException() { diff --git a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java index b23693fd268d4..47703b90e43cc 100644 --- a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java +++ b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java @@ -64,7 +64,7 @@ protected Collection> nodePlugins() { @Override protected Map createHttpHandlers() { - return Collections.singletonMap("/container", new AzureBlobStoreHttpHandler("container")); + return Collections.singletonMap("/", new AzureBlobStoreHttpHandler("container")); } @Override diff --git a/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpFixture.java b/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpFixture.java index 1def1439429e6..5498c4ebc033f 100644 --- a/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpFixture.java +++ b/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpFixture.java @@ -30,7 +30,7 @@ public class AzureHttpFixture { private AzureHttpFixture(final String address, final int port, final String container) throws IOException { this.server = HttpServer.create(new InetSocketAddress(InetAddress.getByName(address), port), 0); - server.createContext("/" + container, new AzureHttpHandler(container)); + server.createContext("/", new AzureHttpHandler(container)); } private void start() throws Exception { diff --git a/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpHandler.java b/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpHandler.java index 7a94a8c9f2e57..6cd93fc9347fa 100644 --- a/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpHandler.java +++ b/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpHandler.java @@ -22,16 +22,21 @@ import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestUtils; +import java.io.BufferedReader; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.Writer; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.HashMap; @@ -67,18 +72,21 @@ public void handle(final HttpExchange exchange) throws IOException { assert read == -1 : "Request body should have been empty but saw [" + read + "]"; } try { + // Request body is closed in the finally block + final BytesReference requestBody = Streams.readFully(Streams.noCloseStream(exchange.getRequestBody())); if (Regex.simpleMatch("PUT /" + container + "/*blockid=*", request)) { // Put Block (https://docs.microsoft.com/en-us/rest/api/storageservices/put-block) final Map params = new HashMap<>(); RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params); final String blockId = params.get("blockid"); - blobs.put(blockId, Streams.readFully(exchange.getRequestBody())); + blobs.put(blockId, requestBody); exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1); } else if (Regex.simpleMatch("PUT /" + container + "/*comp=blocklist*", request)) { // Put Block List (https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-list) - final String blockList = Streams.copyToString(new InputStreamReader(exchange.getRequestBody(), StandardCharsets.UTF_8)); + final String blockList = + Streams.copyToString(new InputStreamReader(requestBody.streamInput(), StandardCharsets.UTF_8)); final List blockIds = Arrays.stream(blockList.split("")) .filter(line -> line.contains("")) .map(line -> line.substring(0, line.indexOf(""))) @@ -97,12 +105,12 @@ public void handle(final HttpExchange exchange) throws IOException { // PUT Blob (see https://docs.microsoft.com/en-us/rest/api/storageservices/put-blob) final String ifNoneMatch = exchange.getRequestHeaders().getFirst("If-None-Match"); if ("*".equals(ifNoneMatch)) { - if (blobs.putIfAbsent(exchange.getRequestURI().getPath(), Streams.readFully(exchange.getRequestBody())) != null) { + if (blobs.putIfAbsent(exchange.getRequestURI().getPath(), requestBody) != null) { sendError(exchange, RestStatus.CONFLICT); return; } } else { - blobs.put(exchange.getRequestURI().getPath(), Streams.readFully(exchange.getRequestBody())); + blobs.put(exchange.getRequestURI().getPath(), requestBody); } exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1); @@ -190,6 +198,45 @@ public void handle(final HttpExchange exchange) throws IOException { exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length); exchange.getResponseBody().write(response); + } else if (Regex.simpleMatch("POST /?comp=batch", request)) { + // Batch Delete (https://docs.microsoft.com/en-us/rest/api/storageservices/blob-batch) + try (BufferedReader reader = new BufferedReader(new InputStreamReader(requestBody.streamInput()))) { + final Set toDelete = reader.lines().filter(l -> l.startsWith("DELETE")) + .map(l -> l.split(" ")[1]).collect(Collectors.toSet()); + final BytesStreamOutput baos = new BytesStreamOutput(); + final String batchSeparator = "batchresponse_" + UUIDs.randomBase64UUID(); + try (Writer writer = new OutputStreamWriter(baos)) { + int contentId = 0; + for (String b : toDelete) { + writer.write("\r\n--" + batchSeparator + "\r\n" + + "Content-Type: application/http \r\n" + + "Content-ID: " + contentId++ + " \r\n"); + if (blobs.remove(b) == null) { + writer.write("\r\nHTTP/1.1 404 The specified blob does not exist. \r\n" + + "x-ms-error-code: BlobNotFound \r\n" + + "x-ms-request-id: " + UUIDs.randomBase64UUID() + " \r\n" + + "x-ms-version: 2018-11-09\r\n" + + "Content-Length: 216 \r\n" + + "Content-Type: application/xml\r\n\r\n" + + " \r\n" + + "BlobNotFoundThe specified blob does not exist.\r\n" + + "RequestId:" + UUIDs.randomBase64UUID() + "\r\n" + + "Time:2020-01-01T01:01:01.0000000Z\r\n"); + } else { + writer.write( + "\r\nHTTP/1.1 202 Accepted \r\n" + + "x-ms-request-id: " + UUIDs.randomBase64UUID() + " \r\n" + + "x-ms-version: 2018-11-09\r\n\r\n"); + } + } + writer.write("--" + batchSeparator + "--"); + } + final Headers headers = exchange.getResponseHeaders(); + headers.add("Content-Type", + "multipart/mixed; boundary=" + batchSeparator); + exchange.sendResponseHeaders(RestStatus.ACCEPTED.getStatus(), baos.size()); + baos.bytes().writeTo(exchange.getResponseBody()); + } } else { sendError(exchange, RestStatus.BAD_REQUEST); }