From b40939c967a055f761555cb392cbf07b4c3d42cc Mon Sep 17 00:00:00 2001 From: Jocelyn <41338290+jaschrep-msft@users.noreply.github.com> Date: Fri, 24 May 2024 13:01:47 -0400 Subject: [PATCH] Download retriable stream structured message (#44176) * blobs retriable structured message download * test proxy * testproxy * remove commented code --- sdk/storage/Azure.Storage.Blobs/assets.json | 2 +- .../src/Azure.Storage.Blobs.csproj | 1 + .../Azure.Storage.Blobs/src/BlobBaseClient.cs | 89 ++++++++++--------- .../Shared/TransferValidationTestBase.cs | 47 ++++++++++ .../Azure.Storage.Files.DataLake/assets.json | 2 +- .../Azure.Storage.Files.Shares/assets.json | 2 +- 6 files changed, 98 insertions(+), 45 deletions(-) diff --git a/sdk/storage/Azure.Storage.Blobs/assets.json b/sdk/storage/Azure.Storage.Blobs/assets.json index 700e2c053c45b..f659bfa082944 100644 --- a/sdk/storage/Azure.Storage.Blobs/assets.json +++ b/sdk/storage/Azure.Storage.Blobs/assets.json @@ -2,5 +2,5 @@ "AssetsRepo": "Azure/azure-sdk-assets", "AssetsRepoPrefixPath": "net", "TagPrefix": "net/storage/Azure.Storage.Blobs", - "Tag": "net/storage/Azure.Storage.Blobs_efe2c4ee4f" + "Tag": "net/storage/Azure.Storage.Blobs_dcc7be748a" } diff --git a/sdk/storage/Azure.Storage.Blobs/src/Azure.Storage.Blobs.csproj b/sdk/storage/Azure.Storage.Blobs/src/Azure.Storage.Blobs.csproj index 11a4fdff8aba4..527ebfabde810 100644 --- a/sdk/storage/Azure.Storage.Blobs/src/Azure.Storage.Blobs.csproj +++ b/sdk/storage/Azure.Storage.Blobs/src/Azure.Storage.Blobs.csproj @@ -92,6 +92,7 @@ + diff --git a/sdk/storage/Azure.Storage.Blobs/src/BlobBaseClient.cs b/sdk/storage/Azure.Storage.Blobs/src/BlobBaseClient.cs index d9708c01e8d4f..bb591b8d3c0e9 100644 --- a/sdk/storage/Azure.Storage.Blobs/src/BlobBaseClient.cs +++ b/sdk/storage/Azure.Storage.Blobs/src/BlobBaseClient.cs @@ -1547,30 +1547,47 @@ internal virtual async ValueTask> Download // Wrap the response Content in a RetriableStream so we // can return it before it's finished downloading, but still // allow retrying if it fails. - Stream stream = RetriableStream.Create( - response.Value.Content, - startOffset => - StartDownloadAsync( - range, - conditionsWithEtag, - validationOptions, - startOffset, - async, - cancellationToken) - .EnsureCompleted() - .Value.Content, - async startOffset => - (await StartDownloadAsync( - range, - conditionsWithEtag, - validationOptions, - startOffset, - async, - cancellationToken) - .ConfigureAwait(false)) - .Value.Content, - ClientConfiguration.Pipeline.ResponseClassifier, - Constants.MaxReliabilityRetries); + ValueTask> Factory(long offset, bool forceStructuredMessage, bool async, CancellationToken cancellationToken) + => StartDownloadAsync( + range, + conditionsWithEtag, + validationOptions, + offset, + forceStructuredMessage, + async, + cancellationToken); + async ValueTask<(Stream DecodingStream, StructuredMessageDecodingStream.DecodedData DecodedData)> StructuredMessageFactory( + long offset, bool async, CancellationToken cancellationToken) + { + Response result = await Factory(offset, forceStructuredMessage: true, async, cancellationToken).ConfigureAwait(false); + return StructuredMessageDecodingStream.WrapStream(result.Value.Content, result.Value.Details.ContentLength); + } + Stream stream; + if (response.GetRawResponse().Headers.Contains(Constants.StructuredMessage.CrcStructuredMessageHeader)) + { + (Stream decodingStream, StructuredMessageDecodingStream.DecodedData decodedData) = StructuredMessageDecodingStream.WrapStream( + response.Value.Content, response.Value.Details.ContentLength); + stream = new StructuredMessageDecodingRetriableStream( + decodingStream, + decodedData, + startOffset => StructuredMessageFactory(startOffset, async: false, cancellationToken) + .EnsureCompleted(), + async startOffset => await StructuredMessageFactory(startOffset, async: true, cancellationToken) + .ConfigureAwait(false), + ClientConfiguration.Pipeline.ResponseClassifier, + Constants.MaxReliabilityRetries); + } + else + { + stream = RetriableStream.Create( + response.Value.Content, + startOffset => Factory(startOffset, forceStructuredMessage: false, async: false, cancellationToken) + .EnsureCompleted().Value.Content, + async startOffset => (await Factory(startOffset, forceStructuredMessage: false, async: true, cancellationToken) + .ConfigureAwait(false)).Value.Content, + ClientConfiguration.Pipeline.ResponseClassifier, + Constants.MaxReliabilityRetries); + } stream = stream.WithNoDispose().WithProgress(progressHandler); @@ -1645,6 +1662,9 @@ await ContentHasher.AssertResponseHashMatchInternal( /// /// Starting offset to request - in the event of a retry. /// + /// + /// When using transactional CRC, force the request to use structured message. + /// /// /// Whether to invoke the operation asynchronously. /// @@ -1666,6 +1686,7 @@ private async ValueTask> StartDownloadAsyn BlobRequestConditions conditions, DownloadTransferValidationOptions validationOptions, long startOffset = 0, + bool forceStructuredMessage = false, // TODO all CRC will force structured message in future bool async = true, CancellationToken cancellationToken = default) { @@ -1702,7 +1723,7 @@ private async ValueTask> StartDownloadAsyn rangeGetContentMD5 = true; break; case StorageChecksumAlgorithm.StorageCrc64: - if (pageRange?.Length <= Constants.StructuredMessage.MaxDownloadCrcWithHeader) + if (!forceStructuredMessage && pageRange?.Length <= Constants.StructuredMessage.MaxDownloadCrcWithHeader) { rangeGetContentCRC64 = true; } @@ -1757,24 +1778,8 @@ private async ValueTask> StartDownloadAsyn long length = response.IsUnavailable() ? 0 : response.Headers.ContentLength ?? 0; ClientConfiguration.Pipeline.LogTrace($"Response: {response.GetRawResponse().Status}, ContentLength: {length}"); - BlobDownloadStreamingResult result = response.ToBlobDownloadStreamingResult(); - if (response.GetRawResponse().Headers.TryGetValue(Constants.StructuredMessage.CrcStructuredMessageHeader, out string _) && - response.GetRawResponse().Headers.TryGetValue(Constants.HeaderNames.ContentLength, out string rawContentLength)) - { - (result.Content, _) = StructuredMessageDecodingStream.WrapStream(result.Content, long.Parse(rawContentLength)); - } - // if not null, we expected a structured message response - // but we didn't find one in the above condition - else if (structuredBodyType != null) - { - // okay to throw here. due to 4MB checksum limit on service downloads, and how we don't - // request structured message until we exceed that, we are not throwing on a request - // that would have otherwise succeeded and still gotten the desired checksum - throw Errors.ExpectedStructuredMessage(); - } - return Response.FromValue( - result, + response.ToBlobDownloadStreamingResult(), response.GetRawResponse()); } #endregion diff --git a/sdk/storage/Azure.Storage.Common/tests/Shared/TransferValidationTestBase.cs b/sdk/storage/Azure.Storage.Common/tests/Shared/TransferValidationTestBase.cs index 764cecb599ac5..201092978627c 100644 --- a/sdk/storage/Azure.Storage.Common/tests/Shared/TransferValidationTestBase.cs +++ b/sdk/storage/Azure.Storage.Common/tests/Shared/TransferValidationTestBase.cs @@ -2029,6 +2029,53 @@ public virtual async Task DownloadDisablesDefaultClientValidationOptions( Assert.False(response.Headers.Contains("x-ms-content-crc64")); Assert.IsTrue(dest.ToArray().SequenceEqual(data)); } + + [Test] + public virtual async Task DownloadRecoversFromInterruptWithValidation( + [ValueSource(nameof(GetValidationAlgorithms))] StorageChecksumAlgorithm algorithm) + { + using var _ = AzureEventSourceListener.CreateConsoleLogger(); + int dataLen = algorithm.ResolveAuto() switch { + StorageChecksumAlgorithm.StorageCrc64 => 5 * Constants.MB, // >4MB for multisegment + _ => Constants.KB, + }; + + await using IDisposingContainer disposingContainer = await GetDisposingContainerAsync(); + + // Arrange + var data = GetRandomBuffer(dataLen); + + TClientOptions options = ClientBuilder.GetOptions(); + options.AddPolicy(new FaultyDownloadPipelinePolicy(dataLen - 512, new IOException(), () => { }), HttpPipelinePosition.BeforeTransport); + var client = await GetResourceClientAsync( + disposingContainer.Container, + resourceLength: dataLen, + createResource: true, + options: options); + await SetupDataAsync(client, new MemoryStream(data)); + + var validationOptions = new DownloadTransferValidationOptions { ChecksumAlgorithm = algorithm }; + + // Act + var dest = new MemoryStream(); + var response = await DownloadPartitionAsync(client, dest, validationOptions, new HttpRange(length: data.Length)); + + // Assert + // no policies this time; just check response headers + switch (algorithm.ResolveAuto()) + { + case StorageChecksumAlgorithm.MD5: + Assert.True(response.Headers.Contains("Content-MD5")); + break; + case StorageChecksumAlgorithm.StorageCrc64: + Assert.True(response.Headers.Contains(Constants.StructuredMessage.CrcStructuredMessageHeader)); + break; + default: + Assert.Fail("Test can't validate given algorithm type."); + break; + } + Assert.IsTrue(dest.ToArray().SequenceEqual(data)); + } #endregion #region Auto-Algorithm Tests diff --git a/sdk/storage/Azure.Storage.Files.DataLake/assets.json b/sdk/storage/Azure.Storage.Files.DataLake/assets.json index 39ee762ad9a8a..7329a98a34f40 100644 --- a/sdk/storage/Azure.Storage.Files.DataLake/assets.json +++ b/sdk/storage/Azure.Storage.Files.DataLake/assets.json @@ -2,5 +2,5 @@ "AssetsRepo": "Azure/azure-sdk-assets", "AssetsRepoPrefixPath": "net", "TagPrefix": "net/storage/Azure.Storage.Files.DataLake", - "Tag": "net/storage/Azure.Storage.Files.DataLake_6d44446b20" + "Tag": "net/storage/Azure.Storage.Files.DataLake_9c23b9b180" } diff --git a/sdk/storage/Azure.Storage.Files.Shares/assets.json b/sdk/storage/Azure.Storage.Files.Shares/assets.json index d46664532748f..d4df7130a51d0 100644 --- a/sdk/storage/Azure.Storage.Files.Shares/assets.json +++ b/sdk/storage/Azure.Storage.Files.Shares/assets.json @@ -2,5 +2,5 @@ "AssetsRepo": "Azure/azure-sdk-assets", "AssetsRepoPrefixPath": "net", "TagPrefix": "net/storage/Azure.Storage.Files.Shares", - "Tag": "net/storage/Azure.Storage.Files.Shares_997e3d57ce" + "Tag": "net/storage/Azure.Storage.Files.Shares_5e5b51e54d" }