diff --git a/plugins/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java b/plugins/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java index 8f746ef25c25f..d56164857d2a0 100644 --- a/plugins/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java +++ b/plugins/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java @@ -24,7 +24,6 @@ import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; import fixture.azure.AzureHttpHandler; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; @@ -49,7 +48,6 @@ import static org.hamcrest.Matchers.equalTo; -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/66385") @SuppressForbidden(reason = "this test uses a HttpServer to emulate an Azure endpoint") public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTestCase { diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java index 462b3e88f453f..a2e6f80b2c829 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java @@ -31,11 +31,12 @@ import com.azure.storage.blob.models.BlobItemProperties; import com.azure.storage.blob.models.BlobListDetails; import com.azure.storage.blob.models.BlobRange; +import com.azure.storage.blob.models.BlobRequestConditions; import com.azure.storage.blob.models.BlobStorageException; import com.azure.storage.blob.models.DownloadRetryOptions; import com.azure.storage.blob.models.ListBlobsOptions; -import com.azure.storage.blob.models.ParallelTransferOptions; -import com.azure.storage.blob.options.BlobParallelUploadOptions; +import com.azure.storage.blob.options.BlockBlobSimpleUploadOptions; +import com.azure.storage.blob.specialized.BlockBlobAsyncClient; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.util.ReferenceCountUtil; @@ -44,17 +45,20 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetadata; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.blobstore.support.PlainBlobMetadata; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.repositories.azure.AzureRepository.Repository; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import java.io.IOException; import java.io.InputStream; @@ -81,6 +85,7 @@ public class AzureBlobStore implements BlobStore { private static final Logger logger = LogManager.getLogger(AzureBlobStore.class); private static final long DEFAULT_READ_CHUNK_SIZE = new ByteSizeValue(32, ByteSizeUnit.MB).getBytes(); + private static final int DEFAULT_UPLOAD_BUFFERS_SIZE = (int) new ByteSizeValue(64, ByteSizeUnit.KB).getBytes(); private final AzureStorageService service; @@ -381,17 +386,11 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b : "Should not be used with non-mark supporting streams as their retry handling in the SDK is broken"; logger.trace(() -> new ParameterizedMessage("writeBlob({}, stream, {})", blobName, blobSize)); try { - final BlobServiceClient client = client(); - SocketAccess.doPrivilegedVoidException(() -> { - final BlobClient blob = client.getBlobContainerClient(container) - .getBlobClient(blobName); - - ParallelTransferOptions parallelTransferOptions = getParallelTransferOptions(); - BlobParallelUploadOptions blobParallelUploadOptions = - new BlobParallelUploadOptions(inputStream, blobSize) - .setParallelTransferOptions(parallelTransferOptions); - blob.uploadWithResponse(blobParallelUploadOptions, null, null); - }); + if (blobSize <= getLargeBlobThresholdInBytes()) { + executeSingleUpload(blobName, inputStream, blobSize, failIfAlreadyExists); + } else { + executeMultipartUpload(blobName, inputStream, blobSize, failIfAlreadyExists); + } } catch (final BlobStorageException e) { if (failIfAlreadyExists && e.getStatusCode() == HttpURLConnection.HTTP_CONFLICT && BlobErrorCode.BLOB_ALREADY_EXISTS.equals(e.getErrorCode())) { @@ -405,12 +404,156 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b logger.trace(() -> new ParameterizedMessage("writeBlob({}, stream, {}) - done", blobName, blobSize)); } - private ParallelTransferOptions getParallelTransferOptions() { - ParallelTransferOptions parallelTransferOptions = new ParallelTransferOptions(); - parallelTransferOptions.setBlockSizeLong(service.getUploadBlockSize()) - .setMaxSingleUploadSizeLong(service.getSizeThresholdForMultiBlockUpload()) - .setMaxConcurrency(service.getMaxUploadParallelism()); - return parallelTransferOptions; + private void executeSingleUpload(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) { + SocketAccess.doPrivilegedVoidException(() -> { + final BlobServiceAsyncClient asyncClient = asyncClient(); + + final BlobAsyncClient blobAsyncClient = asyncClient.getBlobContainerAsyncClient(container).getBlobAsyncClient(blobName); + final BlockBlobAsyncClient blockBlobAsyncClient = blobAsyncClient.getBlockBlobAsyncClient(); + + final Flux byteBufferFlux = + convertStreamToByteBuffer(inputStream, blobSize, DEFAULT_UPLOAD_BUFFERS_SIZE); + final BlockBlobSimpleUploadOptions options = new BlockBlobSimpleUploadOptions(byteBufferFlux, blobSize); + BlobRequestConditions requestConditions = new BlobRequestConditions(); + if (failIfAlreadyExists) { + requestConditions.setIfNoneMatch("*"); + } + options.setRequestConditions(requestConditions); + blockBlobAsyncClient.uploadWithResponse(options).block(); + }); + } + + private void executeMultipartUpload(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) { + SocketAccess.doPrivilegedVoidException(() -> { + final BlobServiceAsyncClient asyncClient = asyncClient(); + final BlobAsyncClient blobAsyncClient = asyncClient.getBlobContainerAsyncClient(container) + .getBlobAsyncClient(blobName); + final BlockBlobAsyncClient blockBlobAsyncClient = blobAsyncClient.getBlockBlobAsyncClient(); + + final long partSize = getUploadBlockSize(); + final Tuple multiParts = numberOfMultiparts(blobSize, partSize); + final int nbParts = multiParts.v1().intValue(); + final long lastPartSize = multiParts.v2(); + assert blobSize == (((nbParts - 1) * partSize) + lastPartSize) : "blobSize does not match multipart sizes"; + + final List blockIds = new ArrayList<>(nbParts); + for (int i = 0; i < nbParts; i++) { + final long length = i < nbParts - 1 ? partSize : lastPartSize; + final Flux byteBufferFlux = + convertStreamToByteBuffer(inputStream, length, DEFAULT_UPLOAD_BUFFERS_SIZE); + + final String blockId = UUIDs.base64UUID(); + blockBlobAsyncClient.stageBlock(blockId, byteBufferFlux, length).block(); + blockIds.add(blockId); + } + + blockBlobAsyncClient.commitBlockList(blockIds, failIfAlreadyExists == false).block(); + }); + } + + /** + * Converts the provided input stream into a Flux of ByteBuffer. To avoid having large amounts of outstanding + * memory this Flux reads the InputStream into ByteBuffers of {@code chunkSize} size. + * @param inputStream the InputStream to convert + * @param length the InputStream length + * @param chunkSize the chunk size in bytes + * @return a Flux of ByteBuffers + */ + private Flux convertStreamToByteBuffer(InputStream inputStream, long length, int chunkSize) { + assert inputStream.markSupported() : "An InputStream with mark support was expected"; + // We need to mark the InputStream as it's possible that we need to retry for the same chunk + inputStream.mark(Integer.MAX_VALUE); + return Flux.defer(() -> { + final AtomicLong currentTotalLength = new AtomicLong(0); + try { + inputStream.reset(); + } catch (IOException e) { + throw new RuntimeException(e); + } + // This flux is subscribed by a downstream operator that finally queues the + // buffers into netty output queue. Sadly we are not able to get a signal once + // the buffer has been flushed, so we have to allocate those and let the GC to + // reclaim them (see MonoSendMany). Additionally, that very same operator requests + // 128 elements (that's hardcoded) once it's subscribed (later on, it requests + // by 64 elements), that's why we provide 64kb buffers. + return Flux.range(0, (int) Math.ceil((double) length / (double) chunkSize)) + .map(i -> i * chunkSize) + .concatMap(pos -> Mono.fromCallable(() -> { + long count = pos + chunkSize > length ? length - pos : chunkSize; + int numOfBytesRead = 0; + int offset = 0; + int len = (int) count; + final byte[] buffer = new byte[len]; + while (numOfBytesRead != -1 && offset < count) { + numOfBytesRead = inputStream.read(buffer, offset, len); + offset += numOfBytesRead; + len -= numOfBytesRead; + if (numOfBytesRead != -1) { + currentTotalLength.addAndGet(numOfBytesRead); + } + } + if (numOfBytesRead == -1 && currentTotalLength.get() < length) { + throw new IllegalStateException( + "InputStream provided" + currentTotalLength + " bytes, less than the expected" + length + " bytes" + ); + } + return ByteBuffer.wrap(buffer); + })) + .doOnComplete(() -> { + try { + if (inputStream.available() > 0) { + long totalLength = currentTotalLength.get() + inputStream.available(); + throw new IllegalStateException( + "InputStream provided " + totalLength + " bytes, more than the expected " + length + " bytes" + ); + } else if (currentTotalLength.get() > length) { + throw new IllegalStateException( + "Read more data than was requested. Size of data read: " + currentTotalLength.get() + "." + + " Size of data requested: " + length + ); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + }).subscribeOn(Schedulers.elastic()); // We need to subscribe on a different scheduler to avoid blocking the io threads when + // we read the input stream (i.e. when it's rate limited) + } + + /** + * Returns the number parts of size of {@code partSize} needed to reach {@code totalSize}, + * along with the size of the last (or unique) part. + * + * @param totalSize the total size + * @param partSize the part size + * @return a {@link Tuple} containing the number of parts to fill {@code totalSize} and + * the size of the last part + */ + static Tuple numberOfMultiparts(final long totalSize, final long partSize) { + if (partSize <= 0) { + throw new IllegalArgumentException("Part size must be greater than zero"); + } + + if ((totalSize == 0L) || (totalSize <= partSize)) { + return Tuple.tuple(1L, totalSize); + } + + final long parts = totalSize / partSize; + final long remaining = totalSize % partSize; + + if (remaining == 0) { + return Tuple.tuple(parts, partSize); + } else { + return Tuple.tuple(parts + 1, remaining); + } + } + + long getLargeBlobThresholdInBytes() { + return service.getSizeThresholdForMultiBlockUpload(); + } + + long getUploadBlockSize() { + return service.getUploadBlockSize(); } private BlobServiceClient client() { diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureClientProvider.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureClientProvider.java index 08e7c115f3129..51d1821a2d2d0 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureClientProvider.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureClientProvider.java @@ -68,7 +68,7 @@ class AzureClientProvider extends AbstractLifecycleComponent { private static final TimeValue DEFAULT_CONNECTION_TIMEOUT = TimeValue.timeValueSeconds(30); private static final TimeValue DEFAULT_MAX_CONNECTION_IDLE_TIME = TimeValue.timeValueSeconds(60); private static final int DEFAULT_MAX_CONNECTIONS = 50; - private static final int DEFAULT_EVENT_LOOP_THREAD_COUNT = Math.min(Runtime.getRuntime().availableProcessors(), 8) * 2; + private static final int DEFAULT_EVENT_LOOP_THREAD_COUNT = 1; private static final int PENDING_CONNECTION_QUEUE_SIZE = -1; // see ConnectionProvider.ConnectionPoolSpec.pendingAcquireMaxCount static final Setting EVENT_LOOP_THREAD_COUNT = Setting.intSetting( @@ -144,13 +144,12 @@ static AzureClientProvider create(ThreadPool threadPool, Settings settings) { } private static ByteBufAllocator createByteBufAllocator() { - int nHeapArena = PooledByteBufAllocator.defaultNumHeapArena(); + int nHeapArena = 1; int pageSize = PooledByteBufAllocator.defaultPageSize(); int maxOrder = PooledByteBufAllocator.defaultMaxOrder(); int tinyCacheSize = PooledByteBufAllocator.defaultTinyCacheSize(); int smallCacheSize = PooledByteBufAllocator.defaultSmallCacheSize(); int normalCacheSize = PooledByteBufAllocator.defaultNormalCacheSize(); - boolean useCacheForAllThreads = PooledByteBufAllocator.defaultUseCacheForAllThreads(); return new PooledByteBufAllocator(false, nHeapArena, @@ -160,7 +159,7 @@ private static ByteBufAllocator createByteBufAllocator() { tinyCacheSize, smallCacheSize, normalCacheSize, - useCacheForAllThreads); + false); } AzureBlobServiceClient createClient(AzureStorageSettings settings, diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java index 701186b6ffae8..afb4e183abaea 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.monitor.jvm.JvmInfo; import java.net.InetSocketAddress; import java.net.Proxy; @@ -38,8 +39,6 @@ import java.util.Map; import java.util.function.BiConsumer; -import static com.azure.storage.blob.BlobAsyncClient.BLOB_DEFAULT_NUMBER_OF_BUFFERS; -import static com.azure.storage.blob.BlobClient.BLOB_DEFAULT_UPLOAD_BLOCK_SIZE; import static java.util.Collections.emptyMap; public class AzureStorageService { @@ -57,11 +56,23 @@ public class AzureStorageService { */ public static final long MAX_BLOCK_NUMBER = 50000; + /** + * Default block size for multi-block uploads. The Azure repository will use the Put block and Put block list APIs to split the + * stream into several part, each of block_size length, and will upload each part in its own request. + */ + private static final ByteSizeValue DEFAULT_BLOCK_SIZE = new ByteSizeValue( + Math.max( + ByteSizeUnit.MB.toBytes(5), // minimum value + Math.min( + MAX_BLOCK_SIZE.getBytes(), + JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() / 20)), + ByteSizeUnit.BYTES); + /** * The maximum size of a Block Blob. * See https://docs.microsoft.com/en-us/rest/api/storageservices/understanding-block-blobs--append-blobs--and-page-blobs */ - public static final long MAX_BLOB_SIZE = MAX_BLOCK_NUMBER * MAX_BLOCK_SIZE.getBytes(); + public static final long MAX_BLOB_SIZE = MAX_BLOCK_NUMBER * DEFAULT_BLOCK_SIZE.getBytes(); /** * Maximum allowed blob size in Azure blob store. @@ -70,8 +81,7 @@ public class AzureStorageService { // see ModelHelper.BLOB_DEFAULT_MAX_SINGLE_UPLOAD_SIZE private static final long DEFAULT_MAX_SINGLE_UPLOAD_SIZE = new ByteSizeValue(256, ByteSizeUnit.MB).getBytes(); - private static final long DEFAULT_UPLOAD_BLOCK_SIZE = BLOB_DEFAULT_UPLOAD_BLOCK_SIZE; - private static final int DEFAULT_MAX_PARALLELISM = BLOB_DEFAULT_NUMBER_OF_BUFFERS; + private static final long DEFAULT_UPLOAD_BLOCK_SIZE = DEFAULT_BLOCK_SIZE.getBytes(); // 'package' for testing volatile Map storageSettings = emptyMap(); @@ -131,11 +141,6 @@ long getSizeThresholdForMultiBlockUpload() { return DEFAULT_MAX_SINGLE_UPLOAD_SIZE; } - // non-static, package private for testing - int getMaxUploadParallelism() { - return DEFAULT_MAX_PARALLELISM; - } - int getMaxReadRetries(String clientName) { AzureStorageSettings azureStorageSettings = getClientSettings(clientName); return azureStorageSettings.getMaxRetries(); diff --git a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java index d70b7889ad803..9711963dd0097 100644 --- a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java +++ b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java @@ -173,11 +173,6 @@ long getSizeThresholdForMultiBlockUpload() { return ByteSizeUnit.MB.toBytes(1); } - @Override - int getMaxUploadParallelism() { - return 1; - } - @Override int getMaxReadRetries(String clientName) { return maxRetries; @@ -415,6 +410,9 @@ public void testRetryUntilFail() throws IOException { if (requestReceived.compareAndSet(false, true)) { throw new AssertionError("Should not receive two requests"); } else { + // We have to try to read the body since the netty async http client sends the request + // lazily + Streams.readFully(exchange.getRequestBody()); exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1); } } finally {