Skip to content

Commit

Permalink
Reduce memory usage on Azure repository implementation
Browse files Browse the repository at this point in the history
This commit moves the upload logic to the repository itself
instead of delegating into the SDK.
Multi-block uploads are done sequentially instead of in parallel
that allows to bound the outstanding memory.
Additionally the number of i/o threads and heap arenas have been
reduced to 1, to reduce the memory overhead.

Closes elastic#66385

Backport of  elastic#66489
  • Loading branch information
fcofdez committed Dec 17, 2020
1 parent 369fc3b commit 727faf1
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import fixture.azure.AzureHttpHandler;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
Expand All @@ -49,7 +48,6 @@

import static org.hamcrest.Matchers.equalTo;

@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/66385")
@SuppressForbidden(reason = "this test uses a HttpServer to emulate an Azure endpoint")
public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTestCase {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@
import com.azure.storage.blob.models.BlobItemProperties;
import com.azure.storage.blob.models.BlobListDetails;
import com.azure.storage.blob.models.BlobRange;
import com.azure.storage.blob.models.BlobRequestConditions;
import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.blob.models.DownloadRetryOptions;
import com.azure.storage.blob.models.ListBlobsOptions;
import com.azure.storage.blob.models.ParallelTransferOptions;
import com.azure.storage.blob.options.BlobParallelUploadOptions;
import com.azure.storage.blob.options.BlockBlobSimpleUploadOptions;
import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.ReferenceCountUtil;
Expand All @@ -44,17 +45,20 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetadata;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.support.PlainBlobMetadata;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.repositories.azure.AzureRepository.Repository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -81,6 +85,7 @@
public class AzureBlobStore implements BlobStore {
private static final Logger logger = LogManager.getLogger(AzureBlobStore.class);
private static final long DEFAULT_READ_CHUNK_SIZE = new ByteSizeValue(32, ByteSizeUnit.MB).getBytes();
private static final int DEFAULT_UPLOAD_BUFFERS_SIZE = (int) new ByteSizeValue(64, ByteSizeUnit.KB).getBytes();

private final AzureStorageService service;

Expand Down Expand Up @@ -381,17 +386,11 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b
: "Should not be used with non-mark supporting streams as their retry handling in the SDK is broken";
logger.trace(() -> new ParameterizedMessage("writeBlob({}, stream, {})", blobName, blobSize));
try {
final BlobServiceClient client = client();
SocketAccess.doPrivilegedVoidException(() -> {
final BlobClient blob = client.getBlobContainerClient(container)
.getBlobClient(blobName);

ParallelTransferOptions parallelTransferOptions = getParallelTransferOptions();
BlobParallelUploadOptions blobParallelUploadOptions =
new BlobParallelUploadOptions(inputStream, blobSize)
.setParallelTransferOptions(parallelTransferOptions);
blob.uploadWithResponse(blobParallelUploadOptions, null, null);
});
if (blobSize <= getLargeBlobThresholdInBytes()) {
executeSingleUpload(blobName, inputStream, blobSize, failIfAlreadyExists);
} else {
executeMultipartUpload(blobName, inputStream, blobSize, failIfAlreadyExists);
}
} catch (final BlobStorageException e) {
if (failIfAlreadyExists && e.getStatusCode() == HttpURLConnection.HTTP_CONFLICT &&
BlobErrorCode.BLOB_ALREADY_EXISTS.equals(e.getErrorCode())) {
Expand All @@ -405,12 +404,156 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b
logger.trace(() -> new ParameterizedMessage("writeBlob({}, stream, {}) - done", blobName, blobSize));
}

private ParallelTransferOptions getParallelTransferOptions() {
ParallelTransferOptions parallelTransferOptions = new ParallelTransferOptions();
parallelTransferOptions.setBlockSizeLong(service.getUploadBlockSize())
.setMaxSingleUploadSizeLong(service.getSizeThresholdForMultiBlockUpload())
.setMaxConcurrency(service.getMaxUploadParallelism());
return parallelTransferOptions;
private void executeSingleUpload(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) {
SocketAccess.doPrivilegedVoidException(() -> {
final BlobServiceAsyncClient asyncClient = asyncClient();

final BlobAsyncClient blobAsyncClient = asyncClient.getBlobContainerAsyncClient(container).getBlobAsyncClient(blobName);
final BlockBlobAsyncClient blockBlobAsyncClient = blobAsyncClient.getBlockBlobAsyncClient();

final Flux<ByteBuffer> byteBufferFlux =
convertStreamToByteBuffer(inputStream, blobSize, DEFAULT_UPLOAD_BUFFERS_SIZE);
final BlockBlobSimpleUploadOptions options = new BlockBlobSimpleUploadOptions(byteBufferFlux, blobSize);
BlobRequestConditions requestConditions = new BlobRequestConditions();
if (failIfAlreadyExists) {
requestConditions.setIfNoneMatch("*");
}
options.setRequestConditions(requestConditions);
blockBlobAsyncClient.uploadWithResponse(options).block();
});
}

private void executeMultipartUpload(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) {
SocketAccess.doPrivilegedVoidException(() -> {
final BlobServiceAsyncClient asyncClient = asyncClient();
final BlobAsyncClient blobAsyncClient = asyncClient.getBlobContainerAsyncClient(container)
.getBlobAsyncClient(blobName);
final BlockBlobAsyncClient blockBlobAsyncClient = blobAsyncClient.getBlockBlobAsyncClient();

final long partSize = getUploadBlockSize();
final Tuple<Long, Long> multiParts = numberOfMultiparts(blobSize, partSize);
final int nbParts = multiParts.v1().intValue();
final long lastPartSize = multiParts.v2();
assert blobSize == (((nbParts - 1) * partSize) + lastPartSize) : "blobSize does not match multipart sizes";

final List<String> blockIds = new ArrayList<>(nbParts);
for (int i = 0; i < nbParts; i++) {
final long length = i < nbParts - 1 ? partSize : lastPartSize;
final Flux<ByteBuffer> byteBufferFlux =
convertStreamToByteBuffer(inputStream, length, DEFAULT_UPLOAD_BUFFERS_SIZE);

final String blockId = UUIDs.base64UUID();
blockBlobAsyncClient.stageBlock(blockId, byteBufferFlux, length).block();
blockIds.add(blockId);
}

blockBlobAsyncClient.commitBlockList(blockIds, failIfAlreadyExists == false).block();
});
}

/**
* Converts the provided input stream into a Flux of ByteBuffer. To avoid having large amounts of outstanding
* memory this Flux reads the InputStream into ByteBuffers of {@code chunkSize} size.
* @param inputStream the InputStream to convert
* @param length the InputStream length
* @param chunkSize the chunk size in bytes
* @return a Flux of ByteBuffers
*/
private Flux<ByteBuffer> convertStreamToByteBuffer(InputStream inputStream, long length, int chunkSize) {
assert inputStream.markSupported() : "An InputStream with mark support was expected";
// We need to mark the InputStream as it's possible that we need to retry for the same chunk
inputStream.mark(Integer.MAX_VALUE);
return Flux.defer(() -> {
final AtomicLong currentTotalLength = new AtomicLong(0);
try {
inputStream.reset();
} catch (IOException e) {
throw new RuntimeException(e);
}
// This flux is subscribed by a downstream operator that finally queues the
// buffers into netty output queue. Sadly we are not able to get a signal once
// the buffer has been flushed, so we have to allocate those and let the GC to
// reclaim them (see MonoSendMany). Additionally, that very same operator requests
// 128 elements (that's hardcoded) once it's subscribed (later on, it requests
// by 64 elements), that's why we provide 64kb buffers.
return Flux.range(0, (int) Math.ceil((double) length / (double) chunkSize))
.map(i -> i * chunkSize)
.concatMap(pos -> Mono.fromCallable(() -> {
long count = pos + chunkSize > length ? length - pos : chunkSize;
int numOfBytesRead = 0;
int offset = 0;
int len = (int) count;
final byte[] buffer = new byte[len];
while (numOfBytesRead != -1 && offset < count) {
numOfBytesRead = inputStream.read(buffer, offset, len);
offset += numOfBytesRead;
len -= numOfBytesRead;
if (numOfBytesRead != -1) {
currentTotalLength.addAndGet(numOfBytesRead);
}
}
if (numOfBytesRead == -1 && currentTotalLength.get() < length) {
throw new IllegalStateException(
"InputStream provided" + currentTotalLength + " bytes, less than the expected" + length + " bytes"
);
}
return ByteBuffer.wrap(buffer);
}))
.doOnComplete(() -> {
try {
if (inputStream.available() > 0) {
long totalLength = currentTotalLength.get() + inputStream.available();
throw new IllegalStateException(
"InputStream provided " + totalLength + " bytes, more than the expected " + length + " bytes"
);
} else if (currentTotalLength.get() > length) {
throw new IllegalStateException(
"Read more data than was requested. Size of data read: " + currentTotalLength.get() + "." +
" Size of data requested: " + length
);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}).subscribeOn(Schedulers.elastic()); // We need to subscribe on a different scheduler to avoid blocking the io threads when
// we read the input stream (i.e. when it's rate limited)
}

/**
* Returns the number parts of size of {@code partSize} needed to reach {@code totalSize},
* along with the size of the last (or unique) part.
*
* @param totalSize the total size
* @param partSize the part size
* @return a {@link Tuple} containing the number of parts to fill {@code totalSize} and
* the size of the last part
*/
static Tuple<Long, Long> numberOfMultiparts(final long totalSize, final long partSize) {
if (partSize <= 0) {
throw new IllegalArgumentException("Part size must be greater than zero");
}

if ((totalSize == 0L) || (totalSize <= partSize)) {
return Tuple.tuple(1L, totalSize);
}

final long parts = totalSize / partSize;
final long remaining = totalSize % partSize;

if (remaining == 0) {
return Tuple.tuple(parts, partSize);
} else {
return Tuple.tuple(parts + 1, remaining);
}
}

long getLargeBlobThresholdInBytes() {
return service.getSizeThresholdForMultiBlockUpload();
}

long getUploadBlockSize() {
return service.getUploadBlockSize();
}

private BlobServiceClient client() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class AzureClientProvider extends AbstractLifecycleComponent {
private static final TimeValue DEFAULT_CONNECTION_TIMEOUT = TimeValue.timeValueSeconds(30);
private static final TimeValue DEFAULT_MAX_CONNECTION_IDLE_TIME = TimeValue.timeValueSeconds(60);
private static final int DEFAULT_MAX_CONNECTIONS = 50;
private static final int DEFAULT_EVENT_LOOP_THREAD_COUNT = Math.min(Runtime.getRuntime().availableProcessors(), 8) * 2;
private static final int DEFAULT_EVENT_LOOP_THREAD_COUNT = 1;
private static final int PENDING_CONNECTION_QUEUE_SIZE = -1; // see ConnectionProvider.ConnectionPoolSpec.pendingAcquireMaxCount

static final Setting<Integer> EVENT_LOOP_THREAD_COUNT = Setting.intSetting(
Expand Down Expand Up @@ -144,13 +144,12 @@ static AzureClientProvider create(ThreadPool threadPool, Settings settings) {
}

private static ByteBufAllocator createByteBufAllocator() {
int nHeapArena = PooledByteBufAllocator.defaultNumHeapArena();
int nHeapArena = 1;
int pageSize = PooledByteBufAllocator.defaultPageSize();
int maxOrder = PooledByteBufAllocator.defaultMaxOrder();
int tinyCacheSize = PooledByteBufAllocator.defaultTinyCacheSize();
int smallCacheSize = PooledByteBufAllocator.defaultSmallCacheSize();
int normalCacheSize = PooledByteBufAllocator.defaultNormalCacheSize();
boolean useCacheForAllThreads = PooledByteBufAllocator.defaultUseCacheForAllThreads();

return new PooledByteBufAllocator(false,
nHeapArena,
Expand All @@ -160,7 +159,7 @@ private static ByteBufAllocator createByteBufAllocator() {
tinyCacheSize,
smallCacheSize,
normalCacheSize,
useCacheForAllThreads);
false);
}

AzureBlobServiceClient createClient(AzureStorageSettings settings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.monitor.jvm.JvmInfo;

import java.net.InetSocketAddress;
import java.net.Proxy;
Expand All @@ -38,8 +39,6 @@
import java.util.Map;
import java.util.function.BiConsumer;

import static com.azure.storage.blob.BlobAsyncClient.BLOB_DEFAULT_NUMBER_OF_BUFFERS;
import static com.azure.storage.blob.BlobClient.BLOB_DEFAULT_UPLOAD_BLOCK_SIZE;
import static java.util.Collections.emptyMap;

public class AzureStorageService {
Expand All @@ -57,11 +56,23 @@ public class AzureStorageService {
*/
public static final long MAX_BLOCK_NUMBER = 50000;

/**
* Default block size for multi-block uploads. The Azure repository will use the Put block and Put block list APIs to split the
* stream into several part, each of block_size length, and will upload each part in its own request.
*/
private static final ByteSizeValue DEFAULT_BLOCK_SIZE = new ByteSizeValue(
Math.max(
ByteSizeUnit.MB.toBytes(5), // minimum value
Math.min(
MAX_BLOCK_SIZE.getBytes(),
JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() / 20)),
ByteSizeUnit.BYTES);

/**
* The maximum size of a Block Blob.
* See https://docs.microsoft.com/en-us/rest/api/storageservices/understanding-block-blobs--append-blobs--and-page-blobs
*/
public static final long MAX_BLOB_SIZE = MAX_BLOCK_NUMBER * MAX_BLOCK_SIZE.getBytes();
public static final long MAX_BLOB_SIZE = MAX_BLOCK_NUMBER * DEFAULT_BLOCK_SIZE.getBytes();

/**
* Maximum allowed blob size in Azure blob store.
Expand All @@ -70,8 +81,7 @@ public class AzureStorageService {

// see ModelHelper.BLOB_DEFAULT_MAX_SINGLE_UPLOAD_SIZE
private static final long DEFAULT_MAX_SINGLE_UPLOAD_SIZE = new ByteSizeValue(256, ByteSizeUnit.MB).getBytes();
private static final long DEFAULT_UPLOAD_BLOCK_SIZE = BLOB_DEFAULT_UPLOAD_BLOCK_SIZE;
private static final int DEFAULT_MAX_PARALLELISM = BLOB_DEFAULT_NUMBER_OF_BUFFERS;
private static final long DEFAULT_UPLOAD_BLOCK_SIZE = DEFAULT_BLOCK_SIZE.getBytes();

// 'package' for testing
volatile Map<String, AzureStorageSettings> storageSettings = emptyMap();
Expand Down Expand Up @@ -131,11 +141,6 @@ long getSizeThresholdForMultiBlockUpload() {
return DEFAULT_MAX_SINGLE_UPLOAD_SIZE;
}

// non-static, package private for testing
int getMaxUploadParallelism() {
return DEFAULT_MAX_PARALLELISM;
}

int getMaxReadRetries(String clientName) {
AzureStorageSettings azureStorageSettings = getClientSettings(clientName);
return azureStorageSettings.getMaxRetries();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,6 @@ long getSizeThresholdForMultiBlockUpload() {
return ByteSizeUnit.MB.toBytes(1);
}

@Override
int getMaxUploadParallelism() {
return 1;
}

@Override
int getMaxReadRetries(String clientName) {
return maxRetries;
Expand Down Expand Up @@ -415,6 +410,9 @@ public void testRetryUntilFail() throws IOException {
if (requestReceived.compareAndSet(false, true)) {
throw new AssertionError("Should not receive two requests");
} else {
// We have to try to read the body since the netty async http client sends the request
// lazily
Streams.readFully(exchange.getRequestBody());
exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
}
} finally {
Expand Down

0 comments on commit 727faf1

Please sign in to comment.