Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce memory usage on Azure repository implementation #66489

Merged
merged 2 commits into from
Dec 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import fixture.azure.AzureHttpHandler;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
Expand All @@ -50,7 +49,6 @@

import static org.hamcrest.Matchers.equalTo;

@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/66385")
@SuppressForbidden(reason = "this test uses a HttpServer to emulate an Azure endpoint")
public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTestCase {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@
import com.azure.storage.blob.models.BlobItemProperties;
import com.azure.storage.blob.models.BlobListDetails;
import com.azure.storage.blob.models.BlobRange;
import com.azure.storage.blob.models.BlobRequestConditions;
import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.blob.models.DownloadRetryOptions;
import com.azure.storage.blob.models.ListBlobsOptions;
import com.azure.storage.blob.models.ParallelTransferOptions;
import com.azure.storage.blob.options.BlobParallelUploadOptions;
import com.azure.storage.blob.options.BlockBlobSimpleUploadOptions;
import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.ReferenceCountUtil;
Expand All @@ -44,17 +45,20 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetadata;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.support.PlainBlobMetadata;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.repositories.azure.AzureRepository.Repository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -80,6 +84,7 @@
public class AzureBlobStore implements BlobStore {
private static final Logger logger = LogManager.getLogger(AzureBlobStore.class);
private static final long DEFAULT_READ_CHUNK_SIZE = new ByteSizeValue(32, ByteSizeUnit.MB).getBytes();
private static final int DEFAULT_UPLOAD_BUFFERS_SIZE = (int) new ByteSizeValue(64, ByteSizeUnit.KB).getBytes();

private final AzureStorageService service;

Expand Down Expand Up @@ -380,17 +385,11 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b
: "Should not be used with non-mark supporting streams as their retry handling in the SDK is broken";
logger.trace(() -> new ParameterizedMessage("writeBlob({}, stream, {})", blobName, blobSize));
try {
final BlobServiceClient client = client();
SocketAccess.doPrivilegedVoidException(() -> {
final BlobClient blob = client.getBlobContainerClient(container)
.getBlobClient(blobName);

ParallelTransferOptions parallelTransferOptions = getParallelTransferOptions();
BlobParallelUploadOptions blobParallelUploadOptions =
new BlobParallelUploadOptions(inputStream, blobSize)
.setParallelTransferOptions(parallelTransferOptions);
blob.uploadWithResponse(blobParallelUploadOptions, null, null);
});
if (blobSize <= getLargeBlobThresholdInBytes()) {
executeSingleUpload(blobName, inputStream, blobSize, failIfAlreadyExists);
} else {
executeMultipartUpload(blobName, inputStream, blobSize, failIfAlreadyExists);
}
} catch (final BlobStorageException e) {
if (failIfAlreadyExists && e.getStatusCode() == HttpURLConnection.HTTP_CONFLICT &&
BlobErrorCode.BLOB_ALREADY_EXISTS.equals(e.getErrorCode())) {
Expand All @@ -404,12 +403,156 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b
logger.trace(() -> new ParameterizedMessage("writeBlob({}, stream, {}) - done", blobName, blobSize));
}

private ParallelTransferOptions getParallelTransferOptions() {
ParallelTransferOptions parallelTransferOptions = new ParallelTransferOptions();
parallelTransferOptions.setBlockSizeLong(service.getUploadBlockSize())
.setMaxSingleUploadSizeLong(service.getSizeThresholdForMultiBlockUpload())
.setMaxConcurrency(service.getMaxUploadParallelism());
return parallelTransferOptions;
private void executeSingleUpload(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) {
SocketAccess.doPrivilegedVoidException(() -> {
final BlobServiceAsyncClient asyncClient = asyncClient();

final BlobAsyncClient blobAsyncClient = asyncClient.getBlobContainerAsyncClient(container).getBlobAsyncClient(blobName);
final BlockBlobAsyncClient blockBlobAsyncClient = blobAsyncClient.getBlockBlobAsyncClient();

final Flux<ByteBuffer> byteBufferFlux =
convertStreamToByteBuffer(inputStream, blobSize, DEFAULT_UPLOAD_BUFFERS_SIZE);
final BlockBlobSimpleUploadOptions options = new BlockBlobSimpleUploadOptions(byteBufferFlux, blobSize);
BlobRequestConditions requestConditions = new BlobRequestConditions();
if (failIfAlreadyExists) {
requestConditions.setIfNoneMatch("*");
}
options.setRequestConditions(requestConditions);
blockBlobAsyncClient.uploadWithResponse(options).block();
});
}

private void executeMultipartUpload(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) {
SocketAccess.doPrivilegedVoidException(() -> {
final BlobServiceAsyncClient asyncClient = asyncClient();
final BlobAsyncClient blobAsyncClient = asyncClient.getBlobContainerAsyncClient(container)
.getBlobAsyncClient(blobName);
final BlockBlobAsyncClient blockBlobAsyncClient = blobAsyncClient.getBlockBlobAsyncClient();

final long partSize = getUploadBlockSize();
final Tuple<Long, Long> multiParts = numberOfMultiparts(blobSize, partSize);
final int nbParts = multiParts.v1().intValue();
final long lastPartSize = multiParts.v2();
assert blobSize == (((nbParts - 1) * partSize) + lastPartSize) : "blobSize does not match multipart sizes";

final List<String> blockIds = new ArrayList<>(nbParts);
for (int i = 0; i < nbParts; i++) {
final long length = i < nbParts - 1 ? partSize : lastPartSize;
final Flux<ByteBuffer> byteBufferFlux =
convertStreamToByteBuffer(inputStream, length, DEFAULT_UPLOAD_BUFFERS_SIZE);

final String blockId = UUIDs.base64UUID();
blockBlobAsyncClient.stageBlock(blockId, byteBufferFlux, length).block();
blockIds.add(blockId);
}

blockBlobAsyncClient.commitBlockList(blockIds, failIfAlreadyExists == false).block();
});
}

/**
* Converts the provided input stream into a Flux of ByteBuffer. To avoid having large amounts of outstanding
* memory this Flux reads the InputStream into ByteBuffers of {@code chunkSize} size.
* @param inputStream the InputStream to convert
* @param length the InputStream length
* @param chunkSize the chunk size in bytes
* @return a Flux of ByteBuffers
*/
private Flux<ByteBuffer> convertStreamToByteBuffer(InputStream inputStream, long length, int chunkSize) {
assert inputStream.markSupported() : "An InputStream with mark support was expected";
// We need to mark the InputStream as it's possible that we need to retry for the same chunk
inputStream.mark(Integer.MAX_VALUE);
return Flux.defer(() -> {
final AtomicLong currentTotalLength = new AtomicLong(0);
try {
inputStream.reset();
} catch (IOException e) {
throw new RuntimeException(e);
}
// This flux is subscribed by a downstream operator that finally queues the
// buffers into netty output queue. Sadly we are not able to get a signal once
// the buffer has been flushed, so we have to allocate those and let the GC to
// reclaim them (see MonoSendMany). Additionally, that very same operator requests
// 128 elements (that's hardcoded) once it's subscribed (later on, it requests
// by 64 elements), that's why we provide 64kb buffers.
return Flux.range(0, (int) Math.ceil((double) length / (double) chunkSize))
.map(i -> i * chunkSize)
.concatMap(pos -> Mono.fromCallable(() -> {
long count = pos + chunkSize > length ? length - pos : chunkSize;
int numOfBytesRead = 0;
int offset = 0;
int len = (int) count;
final byte[] buffer = new byte[len];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still scares me a little. If we only do 64k at a time here, can't we use the Netty memory allocator (or manage our own set of byte[] and recycle them on doOnComplete or do we still have no guarantees about flushing at that point?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sadly we don't have guarantees about flushing at that point

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I explored a different approach where I was passing an allocator there, and recycling at the end of the request, but in that case you end up holding that memory for the entire duration of the request.

while (numOfBytesRead != -1 && offset < count) {
numOfBytesRead = inputStream.read(buffer, offset, len);
offset += numOfBytesRead;
len -= numOfBytesRead;
if (numOfBytesRead != -1) {
currentTotalLength.addAndGet(numOfBytesRead);
}
}
if (numOfBytesRead == -1 && currentTotalLength.get() < length) {
throw new IllegalStateException(
"InputStream provided" + currentTotalLength + " bytes, less than the expected" + length + " bytes"
);
}
return ByteBuffer.wrap(buffer);
}))
.doOnComplete(() -> {
try {
if (inputStream.available() > 0) {
long totalLength = currentTotalLength.get() + inputStream.available();
throw new IllegalStateException(
"InputStream provided " + totalLength + " bytes, more than the expected " + length + " bytes"
);
} else if (currentTotalLength.get() > length) {
throw new IllegalStateException(
"Read more data than was requested. Size of data read: " + currentTotalLength.get() + "." +
" Size of data requested: " + length
);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}).subscribeOn(Schedulers.elastic()); // We need to subscribe on a different scheduler to avoid blocking the io threads when
// we read the input stream (i.e. when it's rate limited)
}

/**
* Returns the number parts of size of {@code partSize} needed to reach {@code totalSize},
* along with the size of the last (or unique) part.
*
* @param totalSize the total size
* @param partSize the part size
* @return a {@link Tuple} containing the number of parts to fill {@code totalSize} and
* the size of the last part
*/
static Tuple<Long, Long> numberOfMultiparts(final long totalSize, final long partSize) {
if (partSize <= 0) {
throw new IllegalArgumentException("Part size must be greater than zero");
}

if ((totalSize == 0L) || (totalSize <= partSize)) {
return Tuple.tuple(1L, totalSize);
}

final long parts = totalSize / partSize;
final long remaining = totalSize % partSize;

if (remaining == 0) {
return Tuple.tuple(parts, partSize);
} else {
return Tuple.tuple(parts + 1, remaining);
}
}

long getLargeBlobThresholdInBytes() {
return service.getSizeThresholdForMultiBlockUpload();
}

long getUploadBlockSize() {
return service.getUploadBlockSize();
}

private BlobServiceClient client() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class AzureClientProvider extends AbstractLifecycleComponent {
private static final TimeValue DEFAULT_CONNECTION_TIMEOUT = TimeValue.timeValueSeconds(30);
private static final TimeValue DEFAULT_MAX_CONNECTION_IDLE_TIME = TimeValue.timeValueSeconds(60);
private static final int DEFAULT_MAX_CONNECTIONS = 50;
private static final int DEFAULT_EVENT_LOOP_THREAD_COUNT = Math.min(Runtime.getRuntime().availableProcessors(), 8) * 2;
private static final int DEFAULT_EVENT_LOOP_THREAD_COUNT = 1;
private static final int PENDING_CONNECTION_QUEUE_SIZE = -1; // see ConnectionProvider.ConnectionPoolSpec.pendingAcquireMaxCount

static final Setting<Integer> EVENT_LOOP_THREAD_COUNT = Setting.intSetting(
Expand Down Expand Up @@ -144,13 +144,12 @@ static AzureClientProvider create(ThreadPool threadPool, Settings settings) {
}

private static ByteBufAllocator createByteBufAllocator() {
int nHeapArena = PooledByteBufAllocator.defaultNumHeapArena();
int nHeapArena = 1;
int pageSize = PooledByteBufAllocator.defaultPageSize();
int maxOrder = PooledByteBufAllocator.defaultMaxOrder();
int tinyCacheSize = PooledByteBufAllocator.defaultTinyCacheSize();
int smallCacheSize = PooledByteBufAllocator.defaultSmallCacheSize();
int normalCacheSize = PooledByteBufAllocator.defaultNormalCacheSize();
boolean useCacheForAllThreads = PooledByteBufAllocator.defaultUseCacheForAllThreads();

return new PooledByteBufAllocator(false,
nHeapArena,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we can reduce those too, wdyt @original-brownbear?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, can't we just do 1 maybe when we only use one thread now anyway?

Expand All @@ -160,7 +159,7 @@ private static ByteBufAllocator createByteBufAllocator() {
tinyCacheSize,
smallCacheSize,
normalCacheSize,
useCacheForAllThreads);
false);
}

AzureBlobServiceClient createClient(AzureStorageSettings settings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,14 @@
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.monitor.jvm.JvmInfo;

import java.net.InetSocketAddress;
import java.net.Proxy;
import java.net.URL;
import java.util.Map;
import java.util.function.BiConsumer;

import static com.azure.storage.blob.BlobAsyncClient.BLOB_DEFAULT_NUMBER_OF_BUFFERS;
import static com.azure.storage.blob.BlobClient.BLOB_DEFAULT_UPLOAD_BLOCK_SIZE;
import static java.util.Collections.emptyMap;

public class AzureStorageService {
Expand All @@ -55,11 +54,23 @@ public class AzureStorageService {
*/
public static final long MAX_BLOCK_NUMBER = 50000;

/**
* Default block size for multi-block uploads. The Azure repository will use the Put block and Put block list APIs to split the
* stream into several part, each of block_size length, and will upload each part in its own request.
*/
private static final ByteSizeValue DEFAULT_BLOCK_SIZE = new ByteSizeValue(
Math.max(
ByteSizeUnit.MB.toBytes(5), // minimum value
Math.min(
MAX_BLOCK_SIZE.getBytes(),
JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() / 20)),
ByteSizeUnit.BYTES);

/**
* The maximum size of a Block Blob.
* See https://docs.microsoft.com/en-us/rest/api/storageservices/understanding-block-blobs--append-blobs--and-page-blobs
*/
public static final long MAX_BLOB_SIZE = MAX_BLOCK_NUMBER * MAX_BLOCK_SIZE.getBytes();
public static final long MAX_BLOB_SIZE = MAX_BLOCK_NUMBER * DEFAULT_BLOCK_SIZE.getBytes();

/**
* Maximum allowed blob size in Azure blob store.
Expand All @@ -68,8 +79,7 @@ public class AzureStorageService {

// see ModelHelper.BLOB_DEFAULT_MAX_SINGLE_UPLOAD_SIZE
private static final long DEFAULT_MAX_SINGLE_UPLOAD_SIZE = new ByteSizeValue(256, ByteSizeUnit.MB).getBytes();
private static final long DEFAULT_UPLOAD_BLOCK_SIZE = BLOB_DEFAULT_UPLOAD_BLOCK_SIZE;
private static final int DEFAULT_MAX_PARALLELISM = BLOB_DEFAULT_NUMBER_OF_BUFFERS;
private static final long DEFAULT_UPLOAD_BLOCK_SIZE = DEFAULT_BLOCK_SIZE.getBytes();

// 'package' for testing
volatile Map<String, AzureStorageSettings> storageSettings = emptyMap();
Expand Down Expand Up @@ -129,11 +139,6 @@ long getSizeThresholdForMultiBlockUpload() {
return DEFAULT_MAX_SINGLE_UPLOAD_SIZE;
}

// non-static, package private for testing
int getMaxUploadParallelism() {
return DEFAULT_MAX_PARALLELISM;
}

int getMaxReadRetries(String clientName) {
AzureStorageSettings azureStorageSettings = getClientSettings(clientName);
return azureStorageSettings.getMaxRetries();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,6 @@ long getSizeThresholdForMultiBlockUpload() {
return ByteSizeUnit.MB.toBytes(1);
}

@Override
int getMaxUploadParallelism() {
return 1;
}

@Override
int getMaxReadRetries(String clientName) {
return maxRetries;
Expand Down Expand Up @@ -415,6 +410,9 @@ public void testRetryUntilFail() throws IOException {
if (requestReceived.compareAndSet(false, true)) {
throw new AssertionError("Should not receive two requests");
} else {
// We have to try to read the body since the netty async http client sends the request
// lazily
Streams.readFully(exchange.getRequestBody());
exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
}
} finally {
Expand Down