Skip to content

Commit

Permalink
Added support to set BlobParallelUploadOptions.computeMd5 so the serv…
Browse files Browse the repository at this point in the history
…ice can perform an md5 verification. (Azure#15625)
  • Loading branch information
gapra-msft authored Sep 28, 2020
1 parent 846265f commit f11ce68
Show file tree
Hide file tree
Showing 11 changed files with 256 additions and 30 deletions.
1 change: 1 addition & 0 deletions sdk/storage/azure-storage-blob-cryptography/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Release History

## 12.9.0-beta.1 (Unreleased)
- Added support to set BlobParallelUploadOptions.computeMd5 so the service can perform an md5 verification.
- Added support to specify 'requiresEncryption' on the EncryptedBlobClientBuilder to specify whether or not to enforce that the blob is encrypted on download.
- Fixed a bug where the TokenCredential scope would be incorrect for custom URLs.
- Fixed a bug where a custom application id in HttpLogOptions would not be added to the User Agent String.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,8 @@ public Mono<Response<BlockBlobItem>> uploadWithResponse(BlobParallelUploadOption
return dataFinal.flatMap(df -> super.uploadWithResponse(new BlobParallelUploadOptions(df)
.setParallelTransferOptions(options.getParallelTransferOptions()).setHeaders(options.getHeaders())
.setMetadata(metadataFinal).setTags(options.getTags()).setTier(options.getTier())
.setRequestConditions(options.getRequestConditions())));
.setRequestConditions(options.getRequestConditions())
.setComputeMd5(options.isComputeMd5())));
} catch (RuntimeException ex) {
return monoError(logger, ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import com.azure.storage.blob.models.DownloadRetryOptions
import com.azure.storage.blob.models.LeaseStateType
import com.azure.storage.blob.models.LeaseStatusType
import com.azure.storage.blob.models.ParallelTransferOptions
import com.azure.storage.blob.options.BlobParallelUploadOptions
import com.azure.storage.blob.specialized.BlockBlobClient
import com.azure.storage.common.implementation.Constants
import com.microsoft.azure.storage.CloudStorageAccount
Expand Down Expand Up @@ -226,6 +227,29 @@ class EncyptedBlockBlobAPITest extends APISpec {
return compareListToBuffer(byteBufferList, outputByteBuffer)
}

@Unroll
@Requires({ liveMode() })
def "Encryption computeMd5"() {
setup:
def byteBufferList = []
for (def i = 0; i < byteBufferCount; i++) {
byteBufferList.add(getRandomData(size))
}
Flux<ByteBuffer> flux = Flux.fromIterable(byteBufferList)
ParallelTransferOptions parallelTransferOptions = new ParallelTransferOptions()
.setMaxSingleUploadSizeLong(maxSingleUploadSize)
.setBlockSizeLong(blockSize)

expect:
beac.uploadWithResponse(new BlobParallelUploadOptions(flux).setParallelTransferOptions(parallelTransferOptions).setComputeMd5(true)).block().getStatusCode() == 201

where:
size | maxSingleUploadSize | blockSize | byteBufferCount
Constants.KB | null | null | 1 // Simple case where uploadFull is called.
Constants.KB | Constants.KB | 500 * Constants.KB | 1000 // uploadChunked 2 blocks staged
Constants.KB | Constants.KB | 5 * Constants.KB | 1000 // uploadChunked 100 blocks staged
}

// This test checks that HTTP headers are successfully set on the encrypted client
@Unroll
def "Encryption HTTP headers"() {
Expand Down
1 change: 1 addition & 0 deletions sdk/storage/azure-storage-blob/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Release History

## 12.9.0-beta.1 (Unreleased)
- Added support to set BlobParallelUploadOptions.computeMd5 so the service can perform an md5 verification.
- Added support to specify block size when using BlobInputStream.

- Fixed a bug where users could not download more than 5000MB of data in one shot in the downloadToFile API.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@
import com.azure.storage.blob.implementation.util.ModelHelper;
import com.azure.storage.blob.models.AccessTier;
import com.azure.storage.blob.models.BlobHttpHeaders;
import com.azure.storage.blob.options.BlobParallelUploadOptions;
import com.azure.storage.blob.models.BlobRange;
import com.azure.storage.blob.models.BlobRequestConditions;
import com.azure.storage.blob.options.BlobUploadFromFileOptions;
import com.azure.storage.blob.options.BlockBlobCommitBlockListOptions;
import com.azure.storage.blob.models.BlockBlobItem;
import com.azure.storage.blob.options.BlockBlobSimpleUploadOptions;
import com.azure.storage.blob.models.CpkInfo;
import com.azure.storage.blob.models.CustomerProvidedKey;
import com.azure.storage.blob.models.ParallelTransferOptions;
import com.azure.storage.blob.options.BlobParallelUploadOptions;
import com.azure.storage.blob.options.BlobUploadFromFileOptions;
import com.azure.storage.blob.options.BlockBlobCommitBlockListOptions;
import com.azure.storage.blob.options.BlockBlobSimpleUploadOptions;
import com.azure.storage.blob.specialized.AppendBlobAsyncClient;
import com.azure.storage.blob.specialized.BlobAsyncClientBase;
import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
Expand Down Expand Up @@ -419,41 +419,61 @@ public Mono<Response<BlockBlobItem>> uploadWithResponse(BlobParallelUploadOption
try {
StorageImplUtils.assertNotNull("options", options);

BlobRequestConditions validatedRequestConditions = options.getRequestConditions() == null
? new BlobRequestConditions() : options.getRequestConditions();
final ParallelTransferOptions validatedParallelTransferOptions =
final ParallelTransferOptions parallelTransferOptions =
ModelHelper.populateAndApplyDefaults(options.getParallelTransferOptions());
final BlobHttpHeaders headers = options.getHeaders();
final Map<String, String> metadata = options.getMetadata();
final Map<String, String> tags = options.getTags();
final AccessTier tier = options.getTier();
final BlobRequestConditions requestConditions = options.getRequestConditions() == null
? new BlobRequestConditions() : options.getRequestConditions();
final boolean computeMd5 = options.isComputeMd5();

BlockBlobAsyncClient blockBlobAsyncClient = getBlockBlobAsyncClient();

Function<Flux<ByteBuffer>, Mono<Response<BlockBlobItem>>> uploadInChunksFunction = (stream) ->
uploadInChunks(blockBlobAsyncClient, stream, validatedParallelTransferOptions,
options.getHeaders(), options.getMetadata(), options.getTags(),
options.getTier(), validatedRequestConditions);
uploadInChunks(blockBlobAsyncClient, stream, parallelTransferOptions, headers, metadata, tags,
tier, requestConditions, computeMd5);

BiFunction<Flux<ByteBuffer>, Long, Mono<Response<BlockBlobItem>>> uploadFullBlobMethod =
(stream, length) -> blockBlobAsyncClient.uploadWithResponse(new BlockBlobSimpleUploadOptions(
ProgressReporter.addProgressReporting(stream,
validatedParallelTransferOptions.getProgressReceiver()), length)
.setHeaders(options.getHeaders()).setMetadata(options.getMetadata()).setTags(options.getTags())
.setTier(options.getTier()).setRequestConditions(options.getRequestConditions()));
BiFunction<Flux<ByteBuffer>, Long, Mono<Response<BlockBlobItem>>> uploadFullBlobFunction =
(stream, length) -> uploadFullBlob(blockBlobAsyncClient, stream, length, parallelTransferOptions,
headers, metadata, tags, tier, requestConditions, computeMd5);

Flux<ByteBuffer> data = options.getDataFlux() == null ? Utility.convertStreamToByteBuffer(
options.getDataStream(), options.getLength(),
// We can only buffer up to max int due to restrictions in ByteBuffer.
(int) Math.min(Integer.MAX_VALUE, validatedParallelTransferOptions.getBlockSizeLong()))
(int) Math.min(Integer.MAX_VALUE, parallelTransferOptions.getBlockSizeLong()))
: options.getDataFlux();
return UploadUtils.uploadFullOrChunked(data, ModelHelper.wrapBlobOptions(validatedParallelTransferOptions),
uploadInChunksFunction, uploadFullBlobMethod);
return UploadUtils.uploadFullOrChunked(data, ModelHelper.wrapBlobOptions(parallelTransferOptions),
uploadInChunksFunction, uploadFullBlobFunction);
} catch (RuntimeException ex) {
return monoError(logger, ex);
}
}

private Mono<Response<BlockBlobItem>> uploadFullBlob(BlockBlobAsyncClient blockBlobAsyncClient,
Flux<ByteBuffer> data, long length, ParallelTransferOptions parallelTransferOptions, BlobHttpHeaders headers,
Map<String, String> metadata, Map<String, String> tags, AccessTier tier,
BlobRequestConditions requestConditions, boolean computeMd5) {
// Report progress as necessary.
Flux<ByteBuffer> progressData = ProgressReporter.addProgressReporting(data,
parallelTransferOptions.getProgressReceiver());

return UploadUtils.computeMd5(progressData, computeMd5, logger)
.map(fluxMd5Wrapper -> new BlockBlobSimpleUploadOptions(fluxMd5Wrapper.getData(), length)
.setHeaders(headers)
.setMetadata(metadata)
.setTags(tags)
.setTier(tier)
.setRequestConditions(requestConditions)
.setContentMd5(fluxMd5Wrapper.getMd5()))
.flatMap(blockBlobAsyncClient::uploadWithResponse);
}

private Mono<Response<BlockBlobItem>> uploadInChunks(BlockBlobAsyncClient blockBlobAsyncClient,
Flux<ByteBuffer> data, ParallelTransferOptions parallelTransferOptions, BlobHttpHeaders headers,
Map<String, String> metadata, Map<String, String> tags, AccessTier tier,
BlobRequestConditions requestConditions) {
BlobRequestConditions requestConditions, boolean computeMd5) {
// TODO: Sample/api reference
// See ProgressReporter for an explanation on why this lock is necessary and why we use AtomicLong.
AtomicLong totalProgress = new AtomicLong();
Expand Down Expand Up @@ -483,10 +503,11 @@ private Mono<Response<BlockBlobItem>> uploadInChunks(BlockBlobAsyncClient blockB
progressLock,
totalProgress);

final String blockId = Base64.getEncoder().encodeToString(
UUID.randomUUID().toString().getBytes(UTF_8));
return blockBlobAsyncClient.stageBlockWithResponse(blockId, progressData, bufferAggregator.length(),
null, requestConditions.getLeaseId())
final String blockId = Base64.getEncoder().encodeToString(UUID.randomUUID().toString().getBytes(UTF_8));
return UploadUtils.computeMd5(progressData, computeMd5, logger)
.flatMap(fluxMd5Wrapper -> blockBlobAsyncClient.stageBlockWithResponse(blockId,
fluxMd5Wrapper.getData(), bufferAggregator.length(), fluxMd5Wrapper.getMd5(),
requestConditions.getLeaseId()))
// We only care about the stageBlock insofar as it was successful,
// but we need to collect the ids.
.map(x -> blockId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class BlobParallelUploadOptions {
private Map<String, String> tags;
private AccessTier tier;
private BlobRequestConditions requestConditions;
private boolean computeMd5;
private Duration timeout;

/**
Expand Down Expand Up @@ -214,6 +215,25 @@ public BlobParallelUploadOptions setRequestConditions(BlobRequestConditions requ
return this;
}

/**
* @return Whether or not the library should calculate the md5 and send it for the service to verify.
*/
public boolean isComputeMd5() {
return computeMd5;
}

/**
* Sets the computeMd5 property.
*
* @param computeMd5 Whether or not the library should calculate the md5 and send it for the service to
* verify.
* @return The updated options.
*/
public BlobParallelUploadOptions setComputeMd5(boolean computeMd5) {
this.computeMd5 = computeMd5;
return this;
}

/**
* Gets the timeout.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
import com.azure.storage.blob.models.BlobHttpHeaders;
import com.azure.storage.blob.models.BlobRange;
import com.azure.storage.blob.models.BlobRequestConditions;
import com.azure.storage.blob.options.BlockBlobCommitBlockListOptions;
import com.azure.storage.blob.models.BlockBlobItem;
import com.azure.storage.blob.options.BlockBlobListBlocksOptions;
import com.azure.storage.blob.options.BlockBlobSimpleUploadOptions;
import com.azure.storage.blob.models.BlockList;
import com.azure.storage.blob.models.BlockListType;
import com.azure.storage.blob.models.BlockLookupList;
import com.azure.storage.blob.models.CpkInfo;
import com.azure.storage.blob.options.BlockBlobCommitBlockListOptions;
import com.azure.storage.blob.options.BlockBlobListBlocksOptions;
import com.azure.storage.blob.options.BlockBlobSimpleUploadOptions;
import com.azure.storage.common.Utility;
import com.azure.storage.common.implementation.Constants;
import com.azure.storage.common.implementation.StorageImplUtils;
Expand Down Expand Up @@ -235,8 +235,7 @@ public Mono<Response<BlockBlobItem>> uploadWithResponse(Flux<ByteBuffer> data, l
* @param options {@link BlockBlobSimpleUploadOptions}
* @return A reactive response containing the information of the uploaded block blob.
*/
public Mono<Response<BlockBlobItem>> uploadWithResponse(
BlockBlobSimpleUploadOptions options) {
public Mono<Response<BlockBlobItem>> uploadWithResponse(BlockBlobSimpleUploadOptions options) {
try {
return withContext(context -> uploadWithResponse(options, context));
} catch (RuntimeException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1225,6 +1225,29 @@ class BlockBlobAPITest extends APISpec {
10 * Constants.MB | 3 * Constants.MB | 3 || 4 // Data does not squarely fit in buffers.
}

@Unroll
@Requires({ liveMode() })
def "Async buffered upload computeMd5"() {
setup:
def byteBufferList = []
for (def i = 0; i < byteBufferCount; i++) {
byteBufferList.add(getRandomData(size))
}
Flux<ByteBuffer> flux = Flux.fromIterable(byteBufferList)
ParallelTransferOptions parallelTransferOptions = new ParallelTransferOptions()
.setMaxSingleUploadSizeLong(maxSingleUploadSize)
.setBlockSizeLong(blockSize)

expect:
blobAsyncClient.uploadWithResponse(new BlobParallelUploadOptions(flux).setParallelTransferOptions(parallelTransferOptions).setComputeMd5(true)).block().getStatusCode() == 201

where:
size | maxSingleUploadSize | blockSize | byteBufferCount
Constants.KB | null | null | 1 // Simple case where uploadFull is called.
Constants.KB | Constants.KB | 500 * Constants.KB | 1000 // uploadChunked 2 blocks staged
Constants.KB | Constants.KB | 5 * Constants.KB | 1000 // uploadChunked 100 blocks staged
}

def compareListToBuffer(List<ByteBuffer> buffers, ByteBuffer result) {
result.position(0)
for (ByteBuffer buffer : buffers) {
Expand Down
1 change: 1 addition & 0 deletions sdk/storage/azure-storage-common/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## 12.9.0-beta.1 (Unreleased)
- Added a Constant that represented the default storage scope for TokenCredentials.
- Added UploadUtils.computeMd5 that computes the md5 of a flux and wraps it with the data.

## 12.8.0 (2020-08-13)
- Added support for setting tags and filterTags operations on SAS by adding to AccountSASPermissions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package com.azure.storage.common.implementation;

import com.azure.core.http.rest.Response;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.storage.common.ParallelTransferOptions;
import reactor.core.publisher.Flux;
Expand All @@ -15,9 +17,13 @@
import java.nio.channels.AsynchronousFileChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.function.BiFunction;
import java.util.function.Function;

import static com.azure.core.util.FluxUtil.monoError;

/**
* This class provides helper methods for buffered upload.
*
Expand Down Expand Up @@ -142,4 +148,48 @@ public static void uploadFileCleanup(AsynchronousFileChannel channel, ClientLogg
throw logger.logExceptionAsError(new UncheckedIOException(e));
}
}

/**
* Computes the md5 of the data and wraps it with the data.
*
* @param data The data.
* @param computeMd5 Whether or not to compute the md5.
* @param logger Logger to log errors.
* @return The data wrapped with its md5.
*/
public static Mono<FluxMd5Wrapper> computeMd5(Flux<ByteBuffer> data, boolean computeMd5, ClientLogger logger) {
if (computeMd5) {
try {
return data.reduce(MessageDigest.getInstance("MD5"), (digest, buffer) -> {
int position = buffer.position();
byte[] bytes = FluxUtil.byteBufferToArray(buffer);
digest.update(bytes, 0, bytes.length);
buffer.position(position);
return digest;
}).map(messageDigest -> new FluxMd5Wrapper(data, messageDigest.digest()));
} catch (NoSuchAlgorithmException e) {
return monoError(logger, new RuntimeException(e));
}
} else {
return Mono.just(new FluxMd5Wrapper(data, null));
}
}

public static class FluxMd5Wrapper {
private final Flux<ByteBuffer> data;
private final byte[] md5;

FluxMd5Wrapper(Flux<ByteBuffer> data, byte[] md5) {
this.data = data;
this.md5 = CoreUtils.clone(md5);
}

public Flux<ByteBuffer> getData() {
return data;
}

public byte[] getMd5() {
return CoreUtils.clone(md5);
}
}
}
Loading

0 comments on commit f11ce68

Please sign in to comment.