Skip to content

Commit

Permalink
Download retriable stream structured message (Azure#44176)
Browse files Browse the repository at this point in the history
* blobs retriable structured message download

* test proxy

* testproxy

* remove commented code
  • Loading branch information
jaschrep-msft committed Aug 12, 2024
1 parent 5a532c6 commit b40939c
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 45 deletions.
2 changes: 1 addition & 1 deletion sdk/storage/Azure.Storage.Blobs/assets.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
"AssetsRepo": "Azure/azure-sdk-assets",
"AssetsRepoPrefixPath": "net",
"TagPrefix": "net/storage/Azure.Storage.Blobs",
"Tag": "net/storage/Azure.Storage.Blobs_efe2c4ee4f"
"Tag": "net/storage/Azure.Storage.Blobs_dcc7be748a"
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
<Compile Include="$(AzureStorageSharedSources)StorageTelemetryPolicy.cs" LinkBase="Shared" />
<Compile Include="$(AzureStorageSharedSources)StorageVersionExtensions.cs" LinkBase="Shared" />
<Compile Include="$(AzureStorageSharedSources)StructuredMessage.cs" LinkBase="Shared" />
<Compile Include="$(AzureStorageSharedSources)StructuredMessageDecodingRetriableStream.cs" LinkBase="Shared" />
<Compile Include="$(AzureStorageSharedSources)StructuredMessageDecodingStream.cs" LinkBase="Shared" />
<Compile Include="$(AzureStorageSharedSources)StructuredMessageEncodingStream.cs" LinkBase="Shared" />
<Compile Include="$(AzureStorageSharedSources)UriExtensions.cs" LinkBase="Shared" />
Expand Down
89 changes: 47 additions & 42 deletions sdk/storage/Azure.Storage.Blobs/src/BlobBaseClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1547,30 +1547,47 @@ internal virtual async ValueTask<Response<BlobDownloadStreamingResult>> Download
// Wrap the response Content in a RetriableStream so we
// can return it before it's finished downloading, but still
// allow retrying if it fails.
Stream stream = RetriableStream.Create(
response.Value.Content,
startOffset =>
StartDownloadAsync(
range,
conditionsWithEtag,
validationOptions,
startOffset,
async,
cancellationToken)
.EnsureCompleted()
.Value.Content,
async startOffset =>
(await StartDownloadAsync(
range,
conditionsWithEtag,
validationOptions,
startOffset,
async,
cancellationToken)
.ConfigureAwait(false))
.Value.Content,
ClientConfiguration.Pipeline.ResponseClassifier,
Constants.MaxReliabilityRetries);
ValueTask<Response<BlobDownloadStreamingResult>> Factory(long offset, bool forceStructuredMessage, bool async, CancellationToken cancellationToken)
=> StartDownloadAsync(
range,
conditionsWithEtag,
validationOptions,
offset,
forceStructuredMessage,
async,
cancellationToken);
async ValueTask<(Stream DecodingStream, StructuredMessageDecodingStream.DecodedData DecodedData)> StructuredMessageFactory(
long offset, bool async, CancellationToken cancellationToken)
{
Response<BlobDownloadStreamingResult> result = await Factory(offset, forceStructuredMessage: true, async, cancellationToken).ConfigureAwait(false);
return StructuredMessageDecodingStream.WrapStream(result.Value.Content, result.Value.Details.ContentLength);
}
Stream stream;
if (response.GetRawResponse().Headers.Contains(Constants.StructuredMessage.CrcStructuredMessageHeader))
{
(Stream decodingStream, StructuredMessageDecodingStream.DecodedData decodedData) = StructuredMessageDecodingStream.WrapStream(
response.Value.Content, response.Value.Details.ContentLength);
stream = new StructuredMessageDecodingRetriableStream(
decodingStream,
decodedData,
startOffset => StructuredMessageFactory(startOffset, async: false, cancellationToken)
.EnsureCompleted(),
async startOffset => await StructuredMessageFactory(startOffset, async: true, cancellationToken)
.ConfigureAwait(false),
ClientConfiguration.Pipeline.ResponseClassifier,
Constants.MaxReliabilityRetries);
}
else
{
stream = RetriableStream.Create(
response.Value.Content,
startOffset => Factory(startOffset, forceStructuredMessage: false, async: false, cancellationToken)
.EnsureCompleted().Value.Content,
async startOffset => (await Factory(startOffset, forceStructuredMessage: false, async: true, cancellationToken)
.ConfigureAwait(false)).Value.Content,
ClientConfiguration.Pipeline.ResponseClassifier,
Constants.MaxReliabilityRetries);
}

stream = stream.WithNoDispose().WithProgress(progressHandler);

Expand Down Expand Up @@ -1645,6 +1662,9 @@ await ContentHasher.AssertResponseHashMatchInternal(
/// <param name="startOffset">
/// Starting offset to request - in the event of a retry.
/// </param>
/// <param name="forceStructuredMessage">
/// When using transactional CRC, force the request to use structured message.
/// </param>
/// <param name="async">
/// Whether to invoke the operation asynchronously.
/// </param>
Expand All @@ -1666,6 +1686,7 @@ private async ValueTask<Response<BlobDownloadStreamingResult>> StartDownloadAsyn
BlobRequestConditions conditions,
DownloadTransferValidationOptions validationOptions,
long startOffset = 0,
bool forceStructuredMessage = false, // TODO all CRC will force structured message in future
bool async = true,
CancellationToken cancellationToken = default)
{
Expand Down Expand Up @@ -1702,7 +1723,7 @@ private async ValueTask<Response<BlobDownloadStreamingResult>> StartDownloadAsyn
rangeGetContentMD5 = true;
break;
case StorageChecksumAlgorithm.StorageCrc64:
if (pageRange?.Length <= Constants.StructuredMessage.MaxDownloadCrcWithHeader)
if (!forceStructuredMessage && pageRange?.Length <= Constants.StructuredMessage.MaxDownloadCrcWithHeader)
{
rangeGetContentCRC64 = true;
}
Expand Down Expand Up @@ -1757,24 +1778,8 @@ private async ValueTask<Response<BlobDownloadStreamingResult>> StartDownloadAsyn
long length = response.IsUnavailable() ? 0 : response.Headers.ContentLength ?? 0;
ClientConfiguration.Pipeline.LogTrace($"Response: {response.GetRawResponse().Status}, ContentLength: {length}");

BlobDownloadStreamingResult result = response.ToBlobDownloadStreamingResult();
if (response.GetRawResponse().Headers.TryGetValue(Constants.StructuredMessage.CrcStructuredMessageHeader, out string _) &&
response.GetRawResponse().Headers.TryGetValue(Constants.HeaderNames.ContentLength, out string rawContentLength))
{
(result.Content, _) = StructuredMessageDecodingStream.WrapStream(result.Content, long.Parse(rawContentLength));
}
// if not null, we expected a structured message response
// but we didn't find one in the above condition
else if (structuredBodyType != null)
{
// okay to throw here. due to 4MB checksum limit on service downloads, and how we don't
// request structured message until we exceed that, we are not throwing on a request
// that would have otherwise succeeded and still gotten the desired checksum
throw Errors.ExpectedStructuredMessage();
}

return Response.FromValue(
result,
response.ToBlobDownloadStreamingResult(),
response.GetRawResponse());
}
#endregion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2029,6 +2029,53 @@ public virtual async Task DownloadDisablesDefaultClientValidationOptions(
Assert.False(response.Headers.Contains("x-ms-content-crc64"));
Assert.IsTrue(dest.ToArray().SequenceEqual(data));
}

[Test]
public virtual async Task DownloadRecoversFromInterruptWithValidation(
[ValueSource(nameof(GetValidationAlgorithms))] StorageChecksumAlgorithm algorithm)
{
using var _ = AzureEventSourceListener.CreateConsoleLogger();
int dataLen = algorithm.ResolveAuto() switch {
StorageChecksumAlgorithm.StorageCrc64 => 5 * Constants.MB, // >4MB for multisegment
_ => Constants.KB,
};

await using IDisposingContainer<TContainerClient> disposingContainer = await GetDisposingContainerAsync();

// Arrange
var data = GetRandomBuffer(dataLen);

TClientOptions options = ClientBuilder.GetOptions();
options.AddPolicy(new FaultyDownloadPipelinePolicy(dataLen - 512, new IOException(), () => { }), HttpPipelinePosition.BeforeTransport);
var client = await GetResourceClientAsync(
disposingContainer.Container,
resourceLength: dataLen,
createResource: true,
options: options);
await SetupDataAsync(client, new MemoryStream(data));

var validationOptions = new DownloadTransferValidationOptions { ChecksumAlgorithm = algorithm };

// Act
var dest = new MemoryStream();
var response = await DownloadPartitionAsync(client, dest, validationOptions, new HttpRange(length: data.Length));

// Assert
// no policies this time; just check response headers
switch (algorithm.ResolveAuto())
{
case StorageChecksumAlgorithm.MD5:
Assert.True(response.Headers.Contains("Content-MD5"));
break;
case StorageChecksumAlgorithm.StorageCrc64:
Assert.True(response.Headers.Contains(Constants.StructuredMessage.CrcStructuredMessageHeader));
break;
default:
Assert.Fail("Test can't validate given algorithm type.");
break;
}
Assert.IsTrue(dest.ToArray().SequenceEqual(data));
}
#endregion

#region Auto-Algorithm Tests
Expand Down
2 changes: 1 addition & 1 deletion sdk/storage/Azure.Storage.Files.DataLake/assets.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
"AssetsRepo": "Azure/azure-sdk-assets",
"AssetsRepoPrefixPath": "net",
"TagPrefix": "net/storage/Azure.Storage.Files.DataLake",
"Tag": "net/storage/Azure.Storage.Files.DataLake_6d44446b20"
"Tag": "net/storage/Azure.Storage.Files.DataLake_9c23b9b180"
}
2 changes: 1 addition & 1 deletion sdk/storage/Azure.Storage.Files.Shares/assets.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
"AssetsRepo": "Azure/azure-sdk-assets",
"AssetsRepoPrefixPath": "net",
"TagPrefix": "net/storage/Azure.Storage.Files.Shares",
"Tag": "net/storage/Azure.Storage.Files.Shares_997e3d57ce"
"Tag": "net/storage/Azure.Storage.Files.Shares_5e5b51e54d"
}

0 comments on commit b40939c

Please sign in to comment.