diff --git a/sdk/storage/azure-storage-blob-cryptography/CHANGELOG.md b/sdk/storage/azure-storage-blob-cryptography/CHANGELOG.md index 3ca4d650819aa..4a96dea2532ed 100644 --- a/sdk/storage/azure-storage-blob-cryptography/CHANGELOG.md +++ b/sdk/storage/azure-storage-blob-cryptography/CHANGELOG.md @@ -1,6 +1,7 @@ # Release History ## 12.9.0-beta.1 (Unreleased) +- Added support to set BlobParallelUploadOptions.computeMd5 so the service can perform an md5 verification. - Added support to specify 'requiresEncryption' on the EncryptedBlobClientBuilder to specify whether or not to enforce that the blob is encrypted on download. - Fixed a bug where the TokenCredential scope would be incorrect for custom URLs. - Fixed a bug where a custom application id in HttpLogOptions would not be added to the User Agent String. diff --git a/sdk/storage/azure-storage-blob-cryptography/src/main/java/com/azure/storage/blob/specialized/cryptography/EncryptedBlobAsyncClient.java b/sdk/storage/azure-storage-blob-cryptography/src/main/java/com/azure/storage/blob/specialized/cryptography/EncryptedBlobAsyncClient.java index e19ac54fc828b..64dd3f1237609 100644 --- a/sdk/storage/azure-storage-blob-cryptography/src/main/java/com/azure/storage/blob/specialized/cryptography/EncryptedBlobAsyncClient.java +++ b/sdk/storage/azure-storage-blob-cryptography/src/main/java/com/azure/storage/blob/specialized/cryptography/EncryptedBlobAsyncClient.java @@ -300,7 +300,8 @@ public Mono> uploadWithResponse(BlobParallelUploadOption return dataFinal.flatMap(df -> super.uploadWithResponse(new BlobParallelUploadOptions(df) .setParallelTransferOptions(options.getParallelTransferOptions()).setHeaders(options.getHeaders()) .setMetadata(metadataFinal).setTags(options.getTags()).setTier(options.getTier()) - .setRequestConditions(options.getRequestConditions()))); + .setRequestConditions(options.getRequestConditions()) + .setComputeMd5(options.isComputeMd5()))); } catch (RuntimeException ex) { return monoError(logger, ex); } diff --git a/sdk/storage/azure-storage-blob-cryptography/src/test/java/com/azure/storage/blob/specialized/cryptography/EncyptedBlockBlobAPITest.groovy b/sdk/storage/azure-storage-blob-cryptography/src/test/java/com/azure/storage/blob/specialized/cryptography/EncyptedBlockBlobAPITest.groovy index edf8b061555c0..d9d428bdf3c21 100644 --- a/sdk/storage/azure-storage-blob-cryptography/src/test/java/com/azure/storage/blob/specialized/cryptography/EncyptedBlockBlobAPITest.groovy +++ b/sdk/storage/azure-storage-blob-cryptography/src/test/java/com/azure/storage/blob/specialized/cryptography/EncyptedBlockBlobAPITest.groovy @@ -18,6 +18,7 @@ import com.azure.storage.blob.models.DownloadRetryOptions import com.azure.storage.blob.models.LeaseStateType import com.azure.storage.blob.models.LeaseStatusType import com.azure.storage.blob.models.ParallelTransferOptions +import com.azure.storage.blob.options.BlobParallelUploadOptions import com.azure.storage.blob.specialized.BlockBlobClient import com.azure.storage.common.implementation.Constants import com.microsoft.azure.storage.CloudStorageAccount @@ -226,6 +227,29 @@ class EncyptedBlockBlobAPITest extends APISpec { return compareListToBuffer(byteBufferList, outputByteBuffer) } + @Unroll + @Requires({ liveMode() }) + def "Encryption computeMd5"() { + setup: + def byteBufferList = [] + for (def i = 0; i < byteBufferCount; i++) { + byteBufferList.add(getRandomData(size)) + } + Flux flux = Flux.fromIterable(byteBufferList) + ParallelTransferOptions parallelTransferOptions = new ParallelTransferOptions() + .setMaxSingleUploadSizeLong(maxSingleUploadSize) + .setBlockSizeLong(blockSize) + + expect: + beac.uploadWithResponse(new BlobParallelUploadOptions(flux).setParallelTransferOptions(parallelTransferOptions).setComputeMd5(true)).block().getStatusCode() == 201 + + where: + size | maxSingleUploadSize | blockSize | byteBufferCount + Constants.KB | null | null | 1 // Simple case where uploadFull is called. + Constants.KB | Constants.KB | 500 * Constants.KB | 1000 // uploadChunked 2 blocks staged + Constants.KB | Constants.KB | 5 * Constants.KB | 1000 // uploadChunked 100 blocks staged + } + // This test checks that HTTP headers are successfully set on the encrypted client @Unroll def "Encryption HTTP headers"() { diff --git a/sdk/storage/azure-storage-blob/CHANGELOG.md b/sdk/storage/azure-storage-blob/CHANGELOG.md index ff7b58f347c6d..bb44a1e2679f5 100644 --- a/sdk/storage/azure-storage-blob/CHANGELOG.md +++ b/sdk/storage/azure-storage-blob/CHANGELOG.md @@ -1,6 +1,7 @@ # Release History ## 12.9.0-beta.1 (Unreleased) +- Added support to set BlobParallelUploadOptions.computeMd5 so the service can perform an md5 verification. - Added support to specify block size when using BlobInputStream. - Fixed a bug where users could not download more than 5000MB of data in one shot in the downloadToFile API. diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobAsyncClient.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobAsyncClient.java index 7410eac63e0ab..96bb8bbd54686 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobAsyncClient.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/BlobAsyncClient.java @@ -11,16 +11,16 @@ import com.azure.storage.blob.implementation.util.ModelHelper; import com.azure.storage.blob.models.AccessTier; import com.azure.storage.blob.models.BlobHttpHeaders; -import com.azure.storage.blob.options.BlobParallelUploadOptions; import com.azure.storage.blob.models.BlobRange; import com.azure.storage.blob.models.BlobRequestConditions; -import com.azure.storage.blob.options.BlobUploadFromFileOptions; -import com.azure.storage.blob.options.BlockBlobCommitBlockListOptions; import com.azure.storage.blob.models.BlockBlobItem; -import com.azure.storage.blob.options.BlockBlobSimpleUploadOptions; import com.azure.storage.blob.models.CpkInfo; import com.azure.storage.blob.models.CustomerProvidedKey; import com.azure.storage.blob.models.ParallelTransferOptions; +import com.azure.storage.blob.options.BlobParallelUploadOptions; +import com.azure.storage.blob.options.BlobUploadFromFileOptions; +import com.azure.storage.blob.options.BlockBlobCommitBlockListOptions; +import com.azure.storage.blob.options.BlockBlobSimpleUploadOptions; import com.azure.storage.blob.specialized.AppendBlobAsyncClient; import com.azure.storage.blob.specialized.BlobAsyncClientBase; import com.azure.storage.blob.specialized.BlockBlobAsyncClient; @@ -419,41 +419,61 @@ public Mono> uploadWithResponse(BlobParallelUploadOption try { StorageImplUtils.assertNotNull("options", options); - BlobRequestConditions validatedRequestConditions = options.getRequestConditions() == null - ? new BlobRequestConditions() : options.getRequestConditions(); - final ParallelTransferOptions validatedParallelTransferOptions = + final ParallelTransferOptions parallelTransferOptions = ModelHelper.populateAndApplyDefaults(options.getParallelTransferOptions()); + final BlobHttpHeaders headers = options.getHeaders(); + final Map metadata = options.getMetadata(); + final Map tags = options.getTags(); + final AccessTier tier = options.getTier(); + final BlobRequestConditions requestConditions = options.getRequestConditions() == null + ? new BlobRequestConditions() : options.getRequestConditions(); + final boolean computeMd5 = options.isComputeMd5(); BlockBlobAsyncClient blockBlobAsyncClient = getBlockBlobAsyncClient(); Function, Mono>> uploadInChunksFunction = (stream) -> - uploadInChunks(blockBlobAsyncClient, stream, validatedParallelTransferOptions, - options.getHeaders(), options.getMetadata(), options.getTags(), - options.getTier(), validatedRequestConditions); + uploadInChunks(blockBlobAsyncClient, stream, parallelTransferOptions, headers, metadata, tags, + tier, requestConditions, computeMd5); - BiFunction, Long, Mono>> uploadFullBlobMethod = - (stream, length) -> blockBlobAsyncClient.uploadWithResponse(new BlockBlobSimpleUploadOptions( - ProgressReporter.addProgressReporting(stream, - validatedParallelTransferOptions.getProgressReceiver()), length) - .setHeaders(options.getHeaders()).setMetadata(options.getMetadata()).setTags(options.getTags()) - .setTier(options.getTier()).setRequestConditions(options.getRequestConditions())); + BiFunction, Long, Mono>> uploadFullBlobFunction = + (stream, length) -> uploadFullBlob(blockBlobAsyncClient, stream, length, parallelTransferOptions, + headers, metadata, tags, tier, requestConditions, computeMd5); Flux data = options.getDataFlux() == null ? Utility.convertStreamToByteBuffer( options.getDataStream(), options.getLength(), // We can only buffer up to max int due to restrictions in ByteBuffer. - (int) Math.min(Integer.MAX_VALUE, validatedParallelTransferOptions.getBlockSizeLong())) + (int) Math.min(Integer.MAX_VALUE, parallelTransferOptions.getBlockSizeLong())) : options.getDataFlux(); - return UploadUtils.uploadFullOrChunked(data, ModelHelper.wrapBlobOptions(validatedParallelTransferOptions), - uploadInChunksFunction, uploadFullBlobMethod); + return UploadUtils.uploadFullOrChunked(data, ModelHelper.wrapBlobOptions(parallelTransferOptions), + uploadInChunksFunction, uploadFullBlobFunction); } catch (RuntimeException ex) { return monoError(logger, ex); } } + private Mono> uploadFullBlob(BlockBlobAsyncClient blockBlobAsyncClient, + Flux data, long length, ParallelTransferOptions parallelTransferOptions, BlobHttpHeaders headers, + Map metadata, Map tags, AccessTier tier, + BlobRequestConditions requestConditions, boolean computeMd5) { + // Report progress as necessary. + Flux progressData = ProgressReporter.addProgressReporting(data, + parallelTransferOptions.getProgressReceiver()); + + return UploadUtils.computeMd5(progressData, computeMd5, logger) + .map(fluxMd5Wrapper -> new BlockBlobSimpleUploadOptions(fluxMd5Wrapper.getData(), length) + .setHeaders(headers) + .setMetadata(metadata) + .setTags(tags) + .setTier(tier) + .setRequestConditions(requestConditions) + .setContentMd5(fluxMd5Wrapper.getMd5())) + .flatMap(blockBlobAsyncClient::uploadWithResponse); + } + private Mono> uploadInChunks(BlockBlobAsyncClient blockBlobAsyncClient, Flux data, ParallelTransferOptions parallelTransferOptions, BlobHttpHeaders headers, Map metadata, Map tags, AccessTier tier, - BlobRequestConditions requestConditions) { + BlobRequestConditions requestConditions, boolean computeMd5) { // TODO: Sample/api reference // See ProgressReporter for an explanation on why this lock is necessary and why we use AtomicLong. AtomicLong totalProgress = new AtomicLong(); @@ -483,10 +503,11 @@ private Mono> uploadInChunks(BlockBlobAsyncClient blockB progressLock, totalProgress); - final String blockId = Base64.getEncoder().encodeToString( - UUID.randomUUID().toString().getBytes(UTF_8)); - return blockBlobAsyncClient.stageBlockWithResponse(blockId, progressData, bufferAggregator.length(), - null, requestConditions.getLeaseId()) + final String blockId = Base64.getEncoder().encodeToString(UUID.randomUUID().toString().getBytes(UTF_8)); + return UploadUtils.computeMd5(progressData, computeMd5, logger) + .flatMap(fluxMd5Wrapper -> blockBlobAsyncClient.stageBlockWithResponse(blockId, + fluxMd5Wrapper.getData(), bufferAggregator.length(), fluxMd5Wrapper.getMd5(), + requestConditions.getLeaseId())) // We only care about the stageBlock insofar as it was successful, // but we need to collect the ids. .map(x -> blockId) diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/options/BlobParallelUploadOptions.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/options/BlobParallelUploadOptions.java index 90b4f74dfc829..19d37d72a8912 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/options/BlobParallelUploadOptions.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/options/BlobParallelUploadOptions.java @@ -32,6 +32,7 @@ public class BlobParallelUploadOptions { private Map tags; private AccessTier tier; private BlobRequestConditions requestConditions; + private boolean computeMd5; private Duration timeout; /** @@ -214,6 +215,25 @@ public BlobParallelUploadOptions setRequestConditions(BlobRequestConditions requ return this; } + /** + * @return Whether or not the library should calculate the md5 and send it for the service to verify. + */ + public boolean isComputeMd5() { + return computeMd5; + } + + /** + * Sets the computeMd5 property. + * + * @param computeMd5 Whether or not the library should calculate the md5 and send it for the service to + * verify. + * @return The updated options. + */ + public BlobParallelUploadOptions setComputeMd5(boolean computeMd5) { + this.computeMd5 = computeMd5; + return this; + } + /** * Gets the timeout. * diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlockBlobAsyncClient.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlockBlobAsyncClient.java index c1c0c17540ef5..60f0b27dd496a 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlockBlobAsyncClient.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlockBlobAsyncClient.java @@ -19,14 +19,14 @@ import com.azure.storage.blob.models.BlobHttpHeaders; import com.azure.storage.blob.models.BlobRange; import com.azure.storage.blob.models.BlobRequestConditions; -import com.azure.storage.blob.options.BlockBlobCommitBlockListOptions; import com.azure.storage.blob.models.BlockBlobItem; -import com.azure.storage.blob.options.BlockBlobListBlocksOptions; -import com.azure.storage.blob.options.BlockBlobSimpleUploadOptions; import com.azure.storage.blob.models.BlockList; import com.azure.storage.blob.models.BlockListType; import com.azure.storage.blob.models.BlockLookupList; import com.azure.storage.blob.models.CpkInfo; +import com.azure.storage.blob.options.BlockBlobCommitBlockListOptions; +import com.azure.storage.blob.options.BlockBlobListBlocksOptions; +import com.azure.storage.blob.options.BlockBlobSimpleUploadOptions; import com.azure.storage.common.Utility; import com.azure.storage.common.implementation.Constants; import com.azure.storage.common.implementation.StorageImplUtils; @@ -235,8 +235,7 @@ public Mono> uploadWithResponse(Flux data, l * @param options {@link BlockBlobSimpleUploadOptions} * @return A reactive response containing the information of the uploaded block blob. */ - public Mono> uploadWithResponse( - BlockBlobSimpleUploadOptions options) { + public Mono> uploadWithResponse(BlockBlobSimpleUploadOptions options) { try { return withContext(context -> uploadWithResponse(options, context)); } catch (RuntimeException ex) { diff --git a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/BlockBlobAPITest.groovy b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/BlockBlobAPITest.groovy index 12294910da03a..62b3280dec178 100644 --- a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/BlockBlobAPITest.groovy +++ b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/specialized/BlockBlobAPITest.groovy @@ -1225,6 +1225,29 @@ class BlockBlobAPITest extends APISpec { 10 * Constants.MB | 3 * Constants.MB | 3 || 4 // Data does not squarely fit in buffers. } + @Unroll + @Requires({ liveMode() }) + def "Async buffered upload computeMd5"() { + setup: + def byteBufferList = [] + for (def i = 0; i < byteBufferCount; i++) { + byteBufferList.add(getRandomData(size)) + } + Flux flux = Flux.fromIterable(byteBufferList) + ParallelTransferOptions parallelTransferOptions = new ParallelTransferOptions() + .setMaxSingleUploadSizeLong(maxSingleUploadSize) + .setBlockSizeLong(blockSize) + + expect: + blobAsyncClient.uploadWithResponse(new BlobParallelUploadOptions(flux).setParallelTransferOptions(parallelTransferOptions).setComputeMd5(true)).block().getStatusCode() == 201 + + where: + size | maxSingleUploadSize | blockSize | byteBufferCount + Constants.KB | null | null | 1 // Simple case where uploadFull is called. + Constants.KB | Constants.KB | 500 * Constants.KB | 1000 // uploadChunked 2 blocks staged + Constants.KB | Constants.KB | 5 * Constants.KB | 1000 // uploadChunked 100 blocks staged + } + def compareListToBuffer(List buffers, ByteBuffer result) { result.position(0) for (ByteBuffer buffer : buffers) { diff --git a/sdk/storage/azure-storage-common/CHANGELOG.md b/sdk/storage/azure-storage-common/CHANGELOG.md index fe5b7771f2a13..c6fef516ec62a 100644 --- a/sdk/storage/azure-storage-common/CHANGELOG.md +++ b/sdk/storage/azure-storage-common/CHANGELOG.md @@ -2,6 +2,7 @@ ## 12.9.0-beta.1 (Unreleased) - Added a Constant that represented the default storage scope for TokenCredentials. +- Added UploadUtils.computeMd5 that computes the md5 of a flux and wraps it with the data. ## 12.8.0 (2020-08-13) - Added support for setting tags and filterTags operations on SAS by adding to AccountSASPermissions. diff --git a/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/implementation/UploadUtils.java b/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/implementation/UploadUtils.java index 1aa1ab06f2318..f7bd085bacb12 100644 --- a/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/implementation/UploadUtils.java +++ b/sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/implementation/UploadUtils.java @@ -4,6 +4,8 @@ package com.azure.storage.common.implementation; import com.azure.core.http.rest.Response; +import com.azure.core.util.CoreUtils; +import com.azure.core.util.FluxUtil; import com.azure.core.util.logging.ClientLogger; import com.azure.storage.common.ParallelTransferOptions; import reactor.core.publisher.Flux; @@ -15,9 +17,13 @@ import java.nio.channels.AsynchronousFileChannel; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.util.function.BiFunction; import java.util.function.Function; +import static com.azure.core.util.FluxUtil.monoError; + /** * This class provides helper methods for buffered upload. * @@ -142,4 +148,48 @@ public static void uploadFileCleanup(AsynchronousFileChannel channel, ClientLogg throw logger.logExceptionAsError(new UncheckedIOException(e)); } } + + /** + * Computes the md5 of the data and wraps it with the data. + * + * @param data The data. + * @param computeMd5 Whether or not to compute the md5. + * @param logger Logger to log errors. + * @return The data wrapped with its md5. + */ + public static Mono computeMd5(Flux data, boolean computeMd5, ClientLogger logger) { + if (computeMd5) { + try { + return data.reduce(MessageDigest.getInstance("MD5"), (digest, buffer) -> { + int position = buffer.position(); + byte[] bytes = FluxUtil.byteBufferToArray(buffer); + digest.update(bytes, 0, bytes.length); + buffer.position(position); + return digest; + }).map(messageDigest -> new FluxMd5Wrapper(data, messageDigest.digest())); + } catch (NoSuchAlgorithmException e) { + return monoError(logger, new RuntimeException(e)); + } + } else { + return Mono.just(new FluxMd5Wrapper(data, null)); + } + } + + public static class FluxMd5Wrapper { + private final Flux data; + private final byte[] md5; + + FluxMd5Wrapper(Flux data, byte[] md5) { + this.data = data; + this.md5 = CoreUtils.clone(md5); + } + + public Flux getData() { + return data; + } + + public byte[] getMd5() { + return CoreUtils.clone(md5); + } + } } diff --git a/sdk/storage/azure-storage-common/src/test/java/com/azure/storage/common/implementation/UploadUtilsTest.groovy b/sdk/storage/azure-storage-common/src/test/java/com/azure/storage/common/implementation/UploadUtilsTest.groovy new file mode 100644 index 0000000000000..5d2390128fef0 --- /dev/null +++ b/sdk/storage/azure-storage-common/src/test/java/com/azure/storage/common/implementation/UploadUtilsTest.groovy @@ -0,0 +1,85 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.storage.common.implementation + +import com.azure.core.util.FluxUtil +import com.azure.core.util.logging.ClientLogger +import reactor.core.publisher.Flux +import reactor.test.StepVerifier +import spock.lang.Specification +import spock.lang.Unroll + +import java.nio.ByteBuffer +import java.security.MessageDigest + +class UploadUtilsTest extends Specification { + + @Unroll + def "computeMd5 md5"() { + setup: + def md5 = MessageDigest.getInstance("MD5").digest("Hello World!".getBytes()) + def flux = Flux.fromIterable(data.stream().map({ str -> ByteBuffer.wrap(str.getBytes())}) as Iterable) + + when: "computeMd5 = true" + def sv = StepVerifier.create(UploadUtils.computeMd5(flux, true, new ClientLogger(UploadUtilsTest.class))) + + then: + sv.expectNextMatches({ w -> w.getMd5() == md5 }) + .expectComplete() + + when: "computeMd5 = false" + sv = StepVerifier.create(UploadUtils.computeMd5(flux, false, new ClientLogger(UploadUtilsTest.class))) + + then: + sv.expectNextMatches({ w -> w.getMd5() == null }) + .expectComplete() + + where: + data || _ + ["Hello World!"] || _ + ["Hello ", "World!"] || _ + ["H", "e", "l", "l", "o", " ", "W", "o", "r", "l", "d", "!"] || _ + ["Hel", "lo World!"] || _ + } + + @Unroll + def "computeMd5 data"() { // This test checks that we maintain the integrity of data when we reset the buffers in the compute md5 calculation. + setup: + def flux = Flux.fromIterable(data.stream().map({ str -> ByteBuffer.wrap(str.getBytes())}) as Iterable) + + when: "computeMd5 = true" + def sv = StepVerifier.create( + UploadUtils.computeMd5(flux, true, new ClientLogger(UploadUtilsTest.class)) + .flatMapMany({ wrapper -> wrapper.getData() }) + .reduce(new StringBuilder(), { sb, buffer -> + sb.append(FluxUtil.byteBufferToArray(buffer)) + return sb + }).map( { sb -> sb.toString()} )) + + then: + sv.expectNext("Hello World!") + .expectComplete() + + when: "computeMd5 = false" + sv = StepVerifier.create( + UploadUtils.computeMd5(flux, false, new ClientLogger(UploadUtilsTest.class)) + .flatMapMany({ wrapper -> wrapper.getData() }) + .reduce(new StringBuilder(), { sb, buffer -> + sb.append(FluxUtil.byteBufferToArray(buffer)) + return sb + }).map( { sb -> sb.toString()} )) + + then: + sv.expectNext("Hello World!") + .expectComplete() + + + where: + data || _ + ["Hello World!"] || _ + ["Hello ", "World!"] || _ + ["H", "e", "l", "l", "o", " ", "W", "o", "r", "l", "d", "!"] || _ + ["Hel", "lo World!"] || _ + } +}