Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CRC reporting: pt 1 #45307

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions sdk/storage/Azure.Storage.Blobs/src/BlobBaseClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1552,7 +1552,7 @@ ValueTask<Response<BlobDownloadStreamingResult>> Factory(long offset, bool force
forceStructuredMessage,
async,
cancellationToken);
async ValueTask<(Stream DecodingStream, StructuredMessageDecodingStream.DecodedData DecodedData)> StructuredMessageFactory(
async ValueTask<(Stream DecodingStream, StructuredMessageDecodingStream.RawDecodedData DecodedData)> StructuredMessageFactory(
long offset, bool async, CancellationToken cancellationToken)
{
Response<BlobDownloadStreamingResult> result = await Factory(offset, forceStructuredMessage: true, async, cancellationToken).ConfigureAwait(false);
Expand All @@ -1561,11 +1561,12 @@ ValueTask<Response<BlobDownloadStreamingResult>> Factory(long offset, bool force
Stream stream;
if (response.GetRawResponse().Headers.Contains(Constants.StructuredMessage.StructuredMessageHeader))
{
(Stream decodingStream, StructuredMessageDecodingStream.DecodedData decodedData) = StructuredMessageDecodingStream.WrapStream(
(Stream decodingStream, StructuredMessageDecodingStream.RawDecodedData decodedData) = StructuredMessageDecodingStream.WrapStream(
response.Value.Content, response.Value.Details.ContentLength);
stream = new StructuredMessageDecodingRetriableStream(
decodingStream,
decodedData,
StructuredMessage.Flags.StorageCrc64,
startOffset => StructuredMessageFactory(startOffset, async: false, cancellationToken)
.EnsureCompleted(),
async startOffset => await StructuredMessageFactory(startOffset, async: true, cancellationToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,32 +12,62 @@ namespace Azure.Storage
/// </summary>
internal static class StorageCrc64Composer
{
public static Memory<byte> Compose(params (byte[] Crc64, long OriginalDataLength)[] partitions)
public static byte[] Compose(params (byte[] Crc64, long OriginalDataLength)[] partitions)
=> Compose(partitions.AsEnumerable());

public static byte[] Compose(IEnumerable<(byte[] Crc64, long OriginalDataLength)> partitions)
{
return Compose(partitions.AsEnumerable());
ulong result = Compose(partitions.Select(tup => (BitConverter.ToUInt64(tup.Crc64, 0), tup.OriginalDataLength)));
return BitConverter.GetBytes(result);
}

public static Memory<byte> Compose(IEnumerable<(byte[] Crc64, long OriginalDataLength)> partitions)
public static byte[] Compose(params (ReadOnlyMemory<byte> Crc64, long OriginalDataLength)[] partitions)
=> Compose(partitions.AsEnumerable());

public static byte[] Compose(IEnumerable<(ReadOnlyMemory<byte> Crc64, long OriginalDataLength)> partitions)
{
ulong result = Compose(partitions.Select(tup => (BitConverter.ToUInt64(tup.Crc64, 0), tup.OriginalDataLength)));
return new Memory<byte>(BitConverter.GetBytes(result));
#if NETSTANDARD2_1_OR_GREATER || NETCOREAPP3_0_OR_GREATER
ulong result = Compose(partitions.Select(tup => (BitConverter.ToUInt64(tup.Crc64.Span), tup.OriginalDataLength)));
#else
ulong result = Compose(partitions.Select(tup => (System.BitConverter.ToUInt64(tup.Crc64.ToArray(), 0), tup.OriginalDataLength)));
#endif
return BitConverter.GetBytes(result);
}

public static byte[] Compose(
ReadOnlySpan<byte> leftCrc64, long leftOriginalDataLength,
ReadOnlySpan<byte> rightCrc64, long rightOriginalDataLength)
{
#if NETSTANDARD2_1_OR_GREATER || NETCOREAPP3_0_OR_GREATER
ulong result = Compose(
(BitConverter.ToUInt64(leftCrc64), leftOriginalDataLength),
(BitConverter.ToUInt64(rightCrc64), rightOriginalDataLength));
#else
ulong result = Compose(
(BitConverter.ToUInt64(leftCrc64.ToArray(), 0), leftOriginalDataLength),
(BitConverter.ToUInt64(rightCrc64.ToArray(), 0), rightOriginalDataLength));
#endif
return BitConverter.GetBytes(result);
}

public static ulong Compose(params (ulong Crc64, long OriginalDataLength)[] partitions)
=> Compose(partitions.AsEnumerable());

public static ulong Compose(IEnumerable<(ulong Crc64, long OriginalDataLength)> partitions)
{
ulong composedCrc = 0;
long composedDataLength = 0;
foreach (var tup in partitions)
foreach ((ulong crc64, long originalDataLength) in partitions)
{
composedCrc = StorageCrc64Calculator.Concatenate(
uInitialCrcAB: 0,
uInitialCrcA: 0,
uFinalCrcA: composedCrc,
uSizeA: (ulong) composedDataLength,
uInitialCrcB: 0,
uFinalCrcB: tup.Crc64,
uSizeB: (ulong)tup.OriginalDataLength);
composedDataLength += tup.OriginalDataLength;
uFinalCrcB: crc64,
uSizeB: (ulong)originalDataLength);
composedDataLength += originalDataLength;
}
return composedCrc;
}
Expand Down
36 changes: 14 additions & 22 deletions sdk/storage/Azure.Storage.Common/src/Shared/StructuredMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,22 +93,18 @@ public static IDisposable GetStreamHeaderBytes(
#endregion

#region StreamFooter
public static int GetStreamFooterSize(Flags flags)
=> flags.HasFlag(Flags.StorageCrc64) ? Crc64Length : 0;

public static void ReadStreamFooter(
ReadOnlySpan<byte> buffer,
Span<byte> crc64 = default)
Flags flags,
out ulong crc64)
{
int expectedBufferSize = 0;
if (!crc64.IsEmpty)
{
Errors.AssertBufferExactSize(crc64, Crc64Length, nameof(crc64));
expectedBufferSize += Crc64Length;
}
int expectedBufferSize = GetSegmentFooterSize(flags);
Errors.AssertBufferExactSize(buffer, expectedBufferSize, nameof(buffer));

if (!crc64.IsEmpty)
{
buffer.Slice(0, Crc64Length).CopyTo(crc64);
}
crc64 = flags.HasFlag(Flags.StorageCrc64) ? BinaryPrimitives.ReadUInt64LittleEndian(buffer) : default;
}

public static int WriteStreamFooter(Span<byte> buffer, ReadOnlySpan<byte> crc64 = default)
Expand Down Expand Up @@ -193,22 +189,18 @@ public static IDisposable GetSegmentHeaderBytes(
#endregion

#region SegmentFooter
public static int GetSegmentFooterSize(Flags flags)
=> flags.HasFlag(Flags.StorageCrc64) ? Crc64Length : 0;

public static void ReadSegmentFooter(
ReadOnlySpan<byte> buffer,
Span<byte> crc64 = default)
Flags flags,
out ulong crc64)
{
int expectedBufferSize = 0;
if (!crc64.IsEmpty)
{
Errors.AssertBufferExactSize(crc64, Crc64Length, nameof(crc64));
expectedBufferSize += Crc64Length;
}
int expectedBufferSize = GetSegmentFooterSize(flags);
Errors.AssertBufferExactSize(buffer, expectedBufferSize, nameof(buffer));

if (!crc64.IsEmpty)
{
buffer.Slice(0, Crc64Length).CopyTo(crc64);
}
crc64 = flags.HasFlag(Flags.StorageCrc64) ? BinaryPrimitives.ReadUInt64LittleEndian(buffer) : default;
}

public static int WriteSegmentFooter(Span<byte> buffer, ReadOnlySpan<byte> crc64 = default)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Buffers;
using System.Buffers.Binary;
using System.Collections.Generic;
using System.IO;
using System.Linq;
Expand All @@ -15,44 +16,59 @@ namespace Azure.Storage.Shared;

internal class StructuredMessageDecodingRetriableStream : Stream
{
public class DecodedData
{
public ulong Crc { get; set; }
}

private readonly Stream _innerRetriable;
private long _decodedBytesRead;

private readonly List<StructuredMessageDecodingStream.DecodedData> _decodedDatas;
private readonly Action<StructuredMessageDecodingStream.DecodedData> _onComplete;
private readonly StructuredMessage.Flags _expectedFlags;
private readonly List<StructuredMessageDecodingStream.RawDecodedData> _decodedDatas;
private readonly Action<DecodedData> _onComplete;

private StorageCrc64HashAlgorithm _totalContentCrc;

private readonly Func<long, (Stream DecodingStream, StructuredMessageDecodingStream.DecodedData DecodedData)> _decodingStreamFactory;
private readonly Func<long, ValueTask<(Stream DecodingStream, StructuredMessageDecodingStream.DecodedData DecodedData)>> _decodingAsyncStreamFactory;
private readonly Func<long, (Stream DecodingStream, StructuredMessageDecodingStream.RawDecodedData DecodedData)> _decodingStreamFactory;
private readonly Func<long, ValueTask<(Stream DecodingStream, StructuredMessageDecodingStream.RawDecodedData DecodedData)>> _decodingAsyncStreamFactory;

public StructuredMessageDecodingRetriableStream(
Stream initialDecodingStream,
StructuredMessageDecodingStream.DecodedData initialDecodedData,
Func<long, (Stream DecodingStream, StructuredMessageDecodingStream.DecodedData DecodedData)> decodingStreamFactory,
Func<long, ValueTask<(Stream DecodingStream, StructuredMessageDecodingStream.DecodedData DecodedData)>> decodingAsyncStreamFactory,
Action<StructuredMessageDecodingStream.DecodedData> onComplete,
StructuredMessageDecodingStream.RawDecodedData initialDecodedData,
StructuredMessage.Flags expectedFlags,
Func<long, (Stream DecodingStream, StructuredMessageDecodingStream.RawDecodedData DecodedData)> decodingStreamFactory,
Func<long, ValueTask<(Stream DecodingStream, StructuredMessageDecodingStream.RawDecodedData DecodedData)>> decodingAsyncStreamFactory,
Action<DecodedData> onComplete,
ResponseClassifier responseClassifier,
int maxRetries)
{
_decodingStreamFactory = decodingStreamFactory;
_decodingAsyncStreamFactory = decodingAsyncStreamFactory;
_innerRetriable = RetriableStream.Create(initialDecodingStream, StreamFactory, StreamFactoryAsync, responseClassifier, maxRetries);
_decodedDatas = new() { initialDecodedData };
_expectedFlags = expectedFlags;
_onComplete = onComplete;

if (expectedFlags.HasFlag(StructuredMessage.Flags.StorageCrc64))
{
_totalContentCrc = StorageCrc64HashAlgorithm.Create();
}
}

private Stream StreamFactory(long _)
{
long offset = _decodedDatas.Select(d => d.SegmentCrcs?.LastOrDefault().SegmentEnd ?? 0).Sum();
(Stream decodingStream, StructuredMessageDecodingStream.DecodedData decodedData) = _decodingStreamFactory(offset);
long offset = _decodedDatas.SelectMany(d => d.SegmentCrcs).Select(s => s.SegmentLen).Sum();
(Stream decodingStream, StructuredMessageDecodingStream.RawDecodedData decodedData) = _decodingStreamFactory(offset);
_decodedDatas.Add(decodedData);
FastForwardInternal(decodingStream, _decodedBytesRead - offset, false).EnsureCompleted();
return decodingStream;
}

private async ValueTask<Stream> StreamFactoryAsync(long _)
{
long offset = _decodedDatas.Select(d => d.SegmentCrcs?.LastOrDefault().SegmentEnd ?? 0).Sum();
(Stream decodingStream, StructuredMessageDecodingStream.DecodedData decodedData) = await _decodingAsyncStreamFactory(offset).ConfigureAwait(false);
long offset = _decodedDatas.SelectMany(d => d.SegmentCrcs).Select(s => s.SegmentLen).Sum();
(Stream decodingStream, StructuredMessageDecodingStream.RawDecodedData decodedData) = await _decodingAsyncStreamFactory(offset).ConfigureAwait(false);
_decodedDatas.Add(decodedData);
await FastForwardInternal(decodingStream, _decodedBytesRead - offset, true).ConfigureAwait(false);
return decodingStream;
Expand Down Expand Up @@ -81,21 +97,41 @@ private static async ValueTask FastForwardInternal(Stream stream, long bytes, bo

protected override void Dispose(bool disposing)
{
foreach (IDisposable data in _decodedDatas)
{
data.Dispose();
}
_decodedDatas.Clear();
_innerRetriable.Dispose();
}

private void OnCompleted()
{
StructuredMessageDecodingStream.DecodedData final = new();
// TODO
DecodedData final = new();
if (_totalContentCrc != null)
{
final.Crc = ValidateCrc();
}
_onComplete?.Invoke(final);
}

private ulong ValidateCrc()
{
using IDisposable _ = ArrayPool<byte>.Shared.RentDisposable(StructuredMessage.Crc64Length * 2, out byte[] buf);
Span<byte> calculatedBytes = new(buf, 0, StructuredMessage.Crc64Length);
_totalContentCrc.GetCurrentHash(calculatedBytes);
ulong calculated = BinaryPrimitives.ReadUInt64LittleEndian(calculatedBytes);

ulong reported = _decodedDatas.Count == 1
? _decodedDatas.First().TotalCrc.Value
: StorageCrc64Composer.Compose(_decodedDatas.SelectMany(d => d.SegmentCrcs));

if (calculated != reported)
{
Span<byte> reportedBytes = new(buf, calculatedBytes.Length, StructuredMessage.Crc64Length);
BinaryPrimitives.WriteUInt64LittleEndian(reportedBytes, reported);
throw Errors.ChecksumMismatch(calculatedBytes, reportedBytes);
}

return calculated;
}

#region Read
public override int Read(byte[] buffer, int offset, int count)
{
Expand All @@ -105,6 +141,10 @@ public override int Read(byte[] buffer, int offset, int count)
{
OnCompleted();
}
else
{
_totalContentCrc?.Append(new ReadOnlySpan<byte>(buffer, offset, read));
}
return read;
}

Expand All @@ -116,6 +156,10 @@ public override async Task<int> ReadAsync(byte[] buffer, int offset, int count,
{
OnCompleted();
}
else
{
_totalContentCrc?.Append(new ReadOnlySpan<byte>(buffer, offset, read));
}
return read;
}

Expand All @@ -128,6 +172,10 @@ public override int Read(Span<byte> buffer)
{
OnCompleted();
}
else
{
_totalContentCrc?.Append(buffer.Slice(0, read));
}
return read;
}

Expand All @@ -139,6 +187,10 @@ public override async ValueTask<int> ReadAsync(Memory<byte> buffer, Cancellation
{
OnCompleted();
}
else
{
_totalContentCrc?.Append(buffer.Span.Slice(0, read));
}
return read;
}
#endif
Expand Down
Loading
Loading