Skip to content

Commit

Permalink
Reduce Memory Use of Parallel Azure Blob Deletes (#71330)
Browse files Browse the repository at this point in the history
1. Limit the number of blob deletes to execute in parallel to `100`.
2. Use flat listing when deleting a directory to require fewer listings
in between deletes and keep the code simpler.

closes #71267
  • Loading branch information
original-brownbear authored Apr 6, 2021
1 parent b690798 commit 8213833
Showing 1 changed file with 48 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.blobstore.BlobContainer;
Expand All @@ -58,20 +59,18 @@
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.file.FileAlreadyExistsException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.BiPredicate;
import java.util.function.Supplier;
import java.util.function.Function;

public class AzureBlobStore implements BlobStore {
private static final Logger logger = LogManager.getLogger(AzureBlobStore.class);
Expand Down Expand Up @@ -153,8 +152,7 @@ public AzureBlobStore(RepositoryMetadata metadata, AzureStorageService service)
private boolean isListRequest(String httpMethod, URL url) {
return httpMethod.equals("GET") &&
url.getQuery() != null &&
url.getQuery().contains("comp=list") &&
url.getQuery().contains("delimiter=");
url.getQuery().contains("comp=list");
}

// https://docs.microsoft.com/en-us/rest/api/storageservices/put-block
Expand Down Expand Up @@ -216,91 +214,80 @@ public boolean blobExists(String blob) throws IOException {
}
}

// number of concurrent blob delete requests to use while bulk deleting
private static final int CONCURRENT_DELETES = 100;

public DeleteResult deleteBlobDirectory(String path) throws IOException {
final AtomicInteger blobsDeleted = new AtomicInteger(0);
final AtomicLong bytesDeleted = new AtomicLong(0);

final BlobServiceClient client = client();
SocketAccess.doPrivilegedVoidException(() -> {
final BlobContainerClient blobContainerClient = client.getBlobContainerClient(container);
final BlobContainerAsyncClient blobContainerAsyncClient = asyncClient().getBlobContainerAsyncClient(container);
final Queue<String> directories = new ArrayDeque<>();
directories.offer(path);
String directoryName;
List<Mono<Void>> deleteTasks = new ArrayList<>();
while ((directoryName = directories.poll()) != null) {
final BlobListDetails blobListDetails = new BlobListDetails()
.setRetrieveMetadata(true);

final ListBlobsOptions options = new ListBlobsOptions()
.setPrefix(directoryName)
.setDetails(blobListDetails);

for (BlobItem blobItem : blobContainerClient.listBlobsByHierarchy("/", options, null)) {
final ListBlobsOptions options = new ListBlobsOptions()
.setPrefix(path)
.setDetails(new BlobListDetails().setRetrieveMetadata(true));
try {
blobContainerAsyncClient.listBlobs(options, null).flatMap(blobItem -> {
if (blobItem.isPrefix() != null && blobItem.isPrefix()) {
directories.offer(blobItem.getName());
return Mono.empty();
} else {
BlobAsyncClient blobAsyncClient = blobContainerAsyncClient.getBlobAsyncClient(blobItem.getName());
final Mono<Void> deleteTask = blobAsyncClient.delete()
// Ignore not found blobs, as it's possible that due to network errors a request
// for an already deleted blob is retried, causing an error.
.onErrorResume(this::isNotFoundError, throwable -> Mono.empty())
.onErrorMap(throwable -> new IOException("Error deleting blob " + blobItem.getName(), throwable));
deleteTasks.add(deleteTask);
final String blobName = blobItem.getName();
BlobAsyncClient blobAsyncClient = blobContainerAsyncClient.getBlobAsyncClient(blobName);
final Mono<Void> deleteTask = getDeleteTask(blobName, blobAsyncClient);
bytesDeleted.addAndGet(blobItem.getProperties().getContentLength());
blobsDeleted.incrementAndGet();
return deleteTask;
}
}
}, CONCURRENT_DELETES).then().block();
} catch (Exception e) {
filterDeleteExceptionsAndRethrow(e, new IOException("Deleting directory [" + path + "] failed"));
}

executeDeleteTasks(deleteTasks, () -> "Deleting directory [" + path + "] failed");
});

return new DeleteResult(blobsDeleted.get(), bytesDeleted.get());
}

private static void filterDeleteExceptionsAndRethrow(Exception e, IOException exception) throws IOException {
int suppressedCount = 0;
for (Throwable suppressed : e.getSuppressed()) {
// We're only interested about the blob deletion exceptions and not in the reactor internals exceptions
if (suppressed instanceof IOException) {
exception.addSuppressed(suppressed);
suppressedCount++;
if (suppressedCount > 10) {
break;
}
}
}
throw exception;
}

void deleteBlobList(List<String> blobs) throws IOException {
if (blobs.isEmpty()) {
return;
}

BlobServiceAsyncClient asyncClient = asyncClient();
SocketAccess.doPrivilegedVoidException(() -> {
List<Mono<Void>> deleteTasks = new ArrayList<>(blobs.size());
final BlobContainerAsyncClient blobContainerClient = asyncClient.getBlobContainerAsyncClient(container);
for (String blob : blobs) {
final Mono<Void> deleteTask = blobContainerClient.getBlobAsyncClient(blob)
.delete()
// Ignore not found blobs
.onErrorResume(this::isNotFoundError, throwable -> Mono.empty())
.onErrorMap(throwable -> new IOException("Error deleting blob " + blob, throwable));

deleteTasks.add(deleteTask);
try {
Flux.fromIterable(blobs).flatMap(blob ->
getDeleteTask(blob, blobContainerClient.getBlobAsyncClient(blob)), CONCURRENT_DELETES).then().block();
} catch (Exception e) {
filterDeleteExceptionsAndRethrow(e,
new IOException("Unable to delete blobs "
+ AllocationService.firstListElementsToCommaDelimitedString(blobs, Function.identity(), false)));
}

executeDeleteTasks(deleteTasks, () -> "Unable to delete blobs " + blobs);
});
}

private boolean isNotFoundError(Throwable e) {
return e instanceof BlobStorageException && ((BlobStorageException) e).getStatusCode() == 404;
}

private void executeDeleteTasks(List<Mono<Void>> deleteTasks, Supplier<String> errorMessageSupplier) throws IOException {
try {
// zipDelayError executes all tasks in parallel and delays
// error propagation until all tasks have finished.
Mono.zipDelayError(deleteTasks, results -> null).block();
} catch (Exception e) {
final IOException exception = new IOException(errorMessageSupplier.get());
for (Throwable suppressed : e.getSuppressed()) {
// We're only interested about the blob deletion exceptions and not in the reactor internals exceptions
if (suppressed instanceof IOException) {
exception.addSuppressed(suppressed);
}
}
throw exception;
}
private static Mono<Void> getDeleteTask(String blobName, BlobAsyncClient blobAsyncClient) {
return blobAsyncClient.delete()
// Ignore not found blobs, as it's possible that due to network errors a request
// for an already deleted blob is retried, causing an error.
.onErrorResume(e ->
e instanceof BlobStorageException && ((BlobStorageException) e).getStatusCode() == 404, throwable -> Mono.empty())
.onErrorMap(throwable -> new IOException("Error deleting blob " + blobName, throwable));
}

public InputStream getInputStream(String blob, long position, final @Nullable Long length) throws IOException {
Expand Down

0 comments on commit 8213833

Please sign in to comment.