diff --git a/sdk/storage/Azure.Storage.DataMovement/src/ChannelProcessing.cs b/sdk/storage/Azure.Storage.DataMovement/src/ChannelProcessing.cs index e7526c80c1dce..ebd38b70dffe8 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/ChannelProcessing.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/ChannelProcessing.cs @@ -15,27 +15,35 @@ namespace Azure.Storage.DataMovement; internal interface IProcessor : IDisposable { ValueTask QueueAsync(TItem item, CancellationToken cancellationToken = default); + bool TryComplete(); ProcessAsync Process { get; set; } } internal static class ChannelProcessing { - public static IProcessor NewProcessor(int parallelism) + public static IProcessor NewProcessor(int readers, int? capacity = null) { - Argument.AssertInRange(parallelism, 1, int.MaxValue, nameof(parallelism)); - return parallelism == 1 - ? new SequentialChannelProcessor( - Channel.CreateUnbounded(new UnboundedChannelOptions() - { - AllowSynchronousContinuations = true, - SingleReader = true, - })) - : new ParallelChannelProcessor( - Channel.CreateUnbounded(new UnboundedChannelOptions() - { - AllowSynchronousContinuations = true, - }), - parallelism); + Argument.AssertInRange(readers, 1, int.MaxValue, nameof(readers)); + if (capacity.HasValue) + { + Argument.AssertInRange(capacity.Value, 1, int.MaxValue, nameof(capacity)); + } + + Channel channel = capacity.HasValue + ? Channel.CreateBounded(new BoundedChannelOptions(capacity.Value) + { + AllowSynchronousContinuations = true, + SingleReader = readers == 1, + FullMode = BoundedChannelFullMode.Wait, + }) + : Channel.CreateUnbounded(new UnboundedChannelOptions() + { + AllowSynchronousContinuations = true, + SingleReader = readers == 1, + }); + return readers == 1 + ? new SequentialChannelProcessor(channel) + : new ParallelChannelProcessor(channel, readers); } private abstract class ChannelProcessor : IProcessor, IDisposable @@ -82,6 +90,8 @@ public async ValueTask QueueAsync(TItem item, CancellationToken cancellationToke await _channel.Writer.WriteAsync(item, cancellationToken).ConfigureAwait(false); } + public bool TryComplete() => _channel.Writer.TryComplete(); + protected abstract ValueTask NotifyOfPendingItemProcessing(); public void Dispose() @@ -126,11 +136,11 @@ public ParallelChannelProcessor( protected override async ValueTask NotifyOfPendingItemProcessing() { - List chunkRunners = new List(DataMovementConstants.MaxJobPartReaders); + List chunkRunners = new List(_maxConcurrentProcessing); while (await _channel.Reader.WaitToReadAsync(_cancellationToken).ConfigureAwait(false)) { TItem item = await _channel.Reader.ReadAsync(_cancellationToken).ConfigureAwait(false); - if (chunkRunners.Count >= DataMovementConstants.MaxJobPartReaders) + if (chunkRunners.Count >= _maxConcurrentProcessing) { // Clear any completed blocks from the task list int removedRunners = chunkRunners.RemoveAll(x => x.IsCompleted || x.IsCanceled || x.IsFaulted); diff --git a/sdk/storage/Azure.Storage.DataMovement/src/DownloadChunkHandler.cs b/sdk/storage/Azure.Storage.DataMovement/src/DownloadChunkHandler.cs index 0f59fea48999c..0ef676da3e7f3 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/DownloadChunkHandler.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/DownloadChunkHandler.cs @@ -4,7 +4,6 @@ using System; using System.IO; using System.Threading; -using System.Threading.Channels; using System.Threading.Tasks; using Azure.Storage.Common; @@ -33,11 +32,10 @@ public struct Behaviors } /// - /// Create channel of to keep track to handle - /// writing downloaded chunks to the destination as well as tracking overall progress. + /// Create channel of to handle writing + /// downloaded chunks to the destination as well as tracking overall progress. /// - private readonly Channel _downloadRangeChannel; - private readonly Task _processDownloadRangeEvents; + private readonly IProcessor _downloadRangeProcessor; private readonly CancellationToken _cancellationToken; private long _bytesTransferred; @@ -66,30 +64,18 @@ public DownloadChunkHandler( Behaviors behaviors, CancellationToken cancellationToken) { - // Set bytes transferred to the length of bytes we got back from the initial - // download request - _bytesTransferred = currentTransferred; - - // The size of the channel should never exceed 50k (limit on blocks in a block blob). - // and that's in the worst case that we never read from the channel and had a maximum chunk blob. - _downloadRangeChannel = Channel.CreateUnbounded( - new UnboundedChannelOptions() - { - // Single reader is required as we can only have one writer to the destination. - SingleReader = true, - }); - _processDownloadRangeEvents = Task.Run(NotifyOfPendingChunkDownloadEvents); - _cancellationToken = cancellationToken; - - _expectedLength = expectedLength; - if (expectedLength <= 0) { throw Errors.InvalidExpectedLength(expectedLength); } Argument.AssertNotNull(behaviors, nameof(behaviors)); - // Set values + _cancellationToken = cancellationToken; + // Set bytes transferred to the length of bytes we got back from the initial + // download request + _bytesTransferred = currentTransferred; + _expectedLength = expectedLength; + _copyToDestinationFile = behaviors.CopyToDestinationFile ?? throw Errors.ArgumentNull(nameof(behaviors.CopyToDestinationFile)); _reportProgressInBytes = behaviors.ReportProgressInBytes @@ -98,44 +84,43 @@ public DownloadChunkHandler( ?? throw Errors.ArgumentNull(nameof(behaviors.InvokeFailedHandler)); _queueCompleteFileDownload = behaviors.QueueCompleteFileDownload ?? throw Errors.ArgumentNull(nameof(behaviors.QueueCompleteFileDownload)); + + _downloadRangeProcessor = ChannelProcessing.NewProcessor( + readers: 1, + capacity: DataMovementConstants.Channels.DownloadChunkCapacity); + _downloadRangeProcessor.Process = ProcessDownloadRange; } public void Dispose() { - _downloadRangeChannel.Writer.TryComplete(); + _downloadRangeProcessor.TryComplete(); } - public void QueueChunk(QueueDownloadChunkArgs args) + public async ValueTask QueueChunkAsync(QueueDownloadChunkArgs args) { - _downloadRangeChannel.Writer.TryWrite(args); + await _downloadRangeProcessor.QueueAsync(args).ConfigureAwait(false); } - private async Task NotifyOfPendingChunkDownloadEvents() + private async Task ProcessDownloadRange(QueueDownloadChunkArgs args, CancellationToken cancellationToken = default) { try { - while (await _downloadRangeChannel.Reader.WaitToReadAsync(_cancellationToken).ConfigureAwait(false)) + // Copy the current chunk to the destination + using (Stream content = args.Content) { - // Read one event argument at a time. - QueueDownloadChunkArgs args = await _downloadRangeChannel.Reader.ReadAsync(_cancellationToken).ConfigureAwait(false); - - // Copy the current chunk to the destination - using (Stream content = args.Content) - { - await _copyToDestinationFile( - args.Offset, - args.Length, - content, - _expectedLength, - initial: _bytesTransferred == 0).ConfigureAwait(false); - } - UpdateBytesAndRange(args.Length); + await _copyToDestinationFile( + args.Offset, + args.Length, + content, + _expectedLength, + initial: _bytesTransferred == 0).ConfigureAwait(false); + } + UpdateBytesAndRange(args.Length); - // Check if we finished downloading the blob - if (_bytesTransferred == _expectedLength) - { - await _queueCompleteFileDownload().ConfigureAwait(false); - } + // Check if we finished downloading the blob + if (_bytesTransferred == _expectedLength) + { + await _queueCompleteFileDownload().ConfigureAwait(false); } } catch (Exception ex) @@ -145,10 +130,6 @@ await _copyToDestinationFile( } } - /// - /// Moves the downloader to the next range and updates/reports bytes transferred. - /// - /// private void UpdateBytesAndRange(long bytesDownloaded) { _bytesTransferred += bytesDownloaded; diff --git a/sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs b/sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs index d3b74e0a0ef0e..67d1590974fab 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs @@ -7,6 +7,7 @@ using System.IO; using System.Linq; using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; using Azure.Core; using Azure.Core.Pipeline; @@ -379,6 +380,7 @@ public async virtual Task InvokeFailedArgAsync(Exception ex) { if (ex is not OperationCanceledException && ex is not TaskCanceledException && + ex is not ChannelClosedException && ex.InnerException is not TaskCanceledException && !ex.Message.Contains("The request was canceled.")) { diff --git a/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataMovementConstants.cs b/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataMovementConstants.cs index 21e598241225b..b71bbe4302dea 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataMovementConstants.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataMovementConstants.cs @@ -1,26 +1,27 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. +using System; + namespace Azure.Storage.DataMovement { internal class DataMovementConstants { - /// - /// Constants of the Data Movement library - /// - internal const int InitialMainPoolSize = 32; - internal const int InitialDownloadFileThreads = 32; // Max is 3000 - internal const int CpuTuningMultiplier = 16; - internal const int MaxJobPartReaders = 64; - internal const int MaxJobChunkTasks = 3000; - internal const int StatusCheckInSec = 10; internal const int DefaultStreamCopyBufferSize = 81920; // Use the .NET default - internal const long DefaultInitialTransferSize = 32 * Constants.MB; internal const long DefaultChunkSize = 4 * Constants.MB; public const char PathForwardSlashDelimiterChar = '/'; + internal static class Channels + { + internal const int MaxJobPartReaders = 32; + internal static int MaxJobChunkReaders = Environment.ProcessorCount * 8; + internal const int JobPartCapacity = 1000; + internal const int JobChunkCapacity = 1000; + internal const int DownloadChunkCapacity = 16; + } + internal static class ConcurrencyTuner { internal const int StandardMultiplier = 2; diff --git a/sdk/storage/Azure.Storage.DataMovement/src/TransferManager.cs b/sdk/storage/Azure.Storage.DataMovement/src/TransferManager.cs index 74b61c9cefcea..282c199f5573e 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/TransferManager.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/TransferManager.cs @@ -60,9 +60,13 @@ protected TransferManager() /// Options that will apply to all transfers started by this TransferManager. public TransferManager(TransferManagerOptions options = default) : this( - ChannelProcessing.NewProcessor(parallelism: 1), - ChannelProcessing.NewProcessor(DataMovementConstants.MaxJobPartReaders), - ChannelProcessing.NewProcessor>(options?.MaximumConcurrency ?? DataMovementConstants.MaxJobChunkTasks), + ChannelProcessing.NewProcessor(readers: 1), + ChannelProcessing.NewProcessor( + readers: DataMovementConstants.Channels.MaxJobPartReaders, + capacity: DataMovementConstants.Channels.JobPartCapacity), + ChannelProcessing.NewProcessor>( + readers: options?.MaximumConcurrency ?? DataMovementConstants.Channels.MaxJobChunkReaders, + capacity: DataMovementConstants.Channels.JobChunkCapacity), new(ArrayPool.Shared, options?.ErrorHandling ?? DataTransferErrorMode.StopOnAnyFailure, new ClientDiagnostics(options?.ClientOptions ?? ClientOptions.Default)), diff --git a/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs b/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs index d159d85781c3f..6423917857242 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/UriToStreamJobPart.cs @@ -405,10 +405,13 @@ await dataStream.CopyToAsync( content.Position = 0; // The chunk handler may have been disposed in failure case - _downloadChunkHandler?.QueueChunk(new QueueDownloadChunkArgs( - offset: range.Offset, - length: (long)range.Length, - content: content)); + if (_downloadChunkHandler != null) + { + await _downloadChunkHandler.QueueChunkAsync(new QueueDownloadChunkArgs( + offset: range.Offset, + length: (long)range.Length, + content: content)).ConfigureAwait(false); + } } catch (Exception ex) { diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/DownloadChunkHandlerTests.cs b/sdk/storage/Azure.Storage.DataMovement/tests/DownloadChunkHandlerTests.cs index 43e79dbbe6c0e..6b57c74a25d80 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/DownloadChunkHandlerTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/DownloadChunkHandlerTests.cs @@ -10,6 +10,7 @@ using System.Threading; using Azure.Core; using Azure.Storage.Tests.Shared; +using System.Threading.Channels; namespace Azure.Storage.DataMovement.Tests { @@ -156,7 +157,7 @@ private MockDownloadChunkBehaviors GetMockDownloadChunkBehaviors() [TestCase(Constants.KB)] [TestCase(Constants.MB)] [TestCase(4 * Constants.MB)] - public void OneChunkTransfer(long blockSize) + public async Task OneChunkTransfer(long blockSize) { // Arrange - Set up tasks MockDownloadChunkBehaviors mockBehaviors = GetMockDownloadChunkBehaviors(); @@ -175,7 +176,7 @@ public void OneChunkTransfer(long blockSize) PredictableStream content = new PredictableStream(blockSize); // Act - Make one chunk that would meet the expected length - downloadChunkHandler.QueueChunk(new QueueDownloadChunkArgs( + await downloadChunkHandler.QueueChunkAsync(new QueueDownloadChunkArgs( offset: 0, length: blockSize, content: content)); @@ -192,7 +193,7 @@ public void OneChunkTransfer(long blockSize) [Test] [TestCase(512)] [TestCase(Constants.KB)] - public void MultipleChunkTransfer(long blockSize) + public async Task MultipleChunkTransfer(long blockSize) { // Arrange - Set up tasks MockDownloadChunkBehaviors mockBehaviors = GetMockDownloadChunkBehaviors(); @@ -212,7 +213,7 @@ public void MultipleChunkTransfer(long blockSize) PredictableStream content = new PredictableStream(blockSize); // Act - First chunk - downloadChunkHandler.QueueChunk(new QueueDownloadChunkArgs( + await downloadChunkHandler.QueueChunkAsync(new QueueDownloadChunkArgs( offset: 0, length: blockSize, content: content)); @@ -228,7 +229,7 @@ public void MultipleChunkTransfer(long blockSize) PredictableStream content2 = new PredictableStream(blockSize); // Act - Second/final chunk - downloadChunkHandler.QueueChunk(new QueueDownloadChunkArgs( + await downloadChunkHandler.QueueChunkAsync(new QueueDownloadChunkArgs( offset: blockSize, length: blockSize, content: content2)); @@ -245,7 +246,7 @@ public void MultipleChunkTransfer(long blockSize) [Test] [TestCase(512)] [TestCase(Constants.KB)] - public void MultipleChunkTransfer_EarlyChunks(long blockSize) + public async Task MultipleChunkTransfer_EarlyChunks(long blockSize) { // Arrange - Set up tasks MockDownloadChunkBehaviors mockBehaviors = GetMockDownloadChunkBehaviors(); @@ -266,7 +267,7 @@ public void MultipleChunkTransfer_EarlyChunks(long blockSize) PredictableStream content = new PredictableStream(blockSize); // Act - The second chunk returns first - downloadChunkHandler.QueueChunk(new QueueDownloadChunkArgs( + await downloadChunkHandler.QueueChunkAsync(new QueueDownloadChunkArgs( offset: blockSize, length: blockSize, content: content)); @@ -280,7 +281,7 @@ public void MultipleChunkTransfer_EarlyChunks(long blockSize) expectedCompleteFileCount: 0); // Act - The first chunk is then returned - downloadChunkHandler.QueueChunk(new QueueDownloadChunkArgs( + await downloadChunkHandler.QueueChunkAsync(new QueueDownloadChunkArgs( offset: 0, length: blockSize, content: content)); @@ -325,7 +326,7 @@ public async Task MultipleChunkTransfer_MultipleProcesses(long blockSize, int ta PredictableStream content = new PredictableStream(blockSize); long offset = i * blockSize; - Task task = Task.Run(() => downloadChunkHandler.QueueChunk(new QueueDownloadChunkArgs( + Task task = Task.Run(async () => await downloadChunkHandler.QueueChunkAsync(new QueueDownloadChunkArgs( offset: offset, length: blockSize, content: content))); @@ -346,7 +347,7 @@ public async Task MultipleChunkTransfer_MultipleProcesses(long blockSize, int ta } [Test] - public void GetCopyToDestinationFileTask_ExpectedFailure() + public async Task GetCopyToDestinationFileTask_ExpectedFailure() { // Arrange MockDownloadChunkBehaviors mockBehaviors = GetMockDownloadChunkBehaviors(); @@ -369,7 +370,7 @@ public void GetCopyToDestinationFileTask_ExpectedFailure() PredictableStream content = new PredictableStream(blockSize); // Act - downloadChunkHandler.QueueChunk(new QueueDownloadChunkArgs( + await downloadChunkHandler.QueueChunkAsync(new QueueDownloadChunkArgs( offset: 0, length: blockSize, content: content)); @@ -384,7 +385,7 @@ public void GetCopyToDestinationFileTask_ExpectedFailure() } [Test] - public void QueueCompleteFileDownloadTask_ExpectedFailure() + public async Task QueueCompleteFileDownloadTask_ExpectedFailure() { // Arrange MockDownloadChunkBehaviors mockBehaviors = GetMockDownloadChunkBehaviors(); @@ -406,7 +407,7 @@ public void QueueCompleteFileDownloadTask_ExpectedFailure() PredictableStream content = new PredictableStream(blockSize); // Act - downloadChunkHandler.QueueChunk(new QueueDownloadChunkArgs( + await downloadChunkHandler.QueueChunkAsync(new QueueDownloadChunkArgs( offset: 0, length: blockSize, content: content)); @@ -442,7 +443,9 @@ public void DisposedEventHandler() // Act downloadChunkHandler.Dispose(); - downloadChunkHandler.QueueChunk(default); + + Assert.ThrowsAsync(async () => + await downloadChunkHandler.QueueChunkAsync(default)); VerifyDelegateInvocations( behaviors: mockBehaviors, diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/Models/StepProcessor.cs b/sdk/storage/Azure.Storage.DataMovement/tests/Models/StepProcessor.cs index 8c60f2da92b8a..6d13fd4d70b14 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/Models/StepProcessor.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/Models/StepProcessor.cs @@ -29,6 +29,8 @@ public ValueTask QueueAsync(T item, CancellationToken cancellationToken = defaul return new(Task.CompletedTask); } + public bool TryComplete() => true; + /// /// Attmpts to read an item from internal queue, then completes /// a call to on it. diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/TransferValidationTests.cs b/sdk/storage/Azure.Storage.DataMovement/tests/TransferValidationTests.cs index fddd5757ee383..7e69489a2ed9a 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/TransferValidationTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/TransferValidationTests.cs @@ -8,12 +8,8 @@ namespace Azure.Storage.DataMovement.Tests { - public class TransferValidationTests : DataMovementTestBase + public class TransferValidationTests { - public TransferValidationTests(bool async) : base(async, default) - { - } - [Test, Pairwise] public async Task LargeSingleFile( [Values(TransferDirection.Copy, TransferDirection.Upload, TransferDirection.Download)] TransferDirection transferDirection, @@ -82,7 +78,7 @@ public async Task LargeSingleFile_Fail_Destination( TestEventsRaised events = new(options); DataTransfer transfer = await transferManager.StartTransferAsync(srcResource, dstResource, options); - CancellationTokenSource tokenSource = new(TimeSpan.FromSeconds(10)); + CancellationTokenSource tokenSource = new(TimeSpan.FromSeconds(30)); await transfer.WaitForCompletionAsync(tokenSource.Token); Assert.That(transfer.HasCompleted, Is.True);