From 14597df4a026055d072114eb00afe2dedc48baad Mon Sep 17 00:00:00 2001 From: Jacob Lauzon Date: Mon, 7 Oct 2024 17:40:33 -0700 Subject: [PATCH 1/2] Fix perf job cleanup and minor stream fixes --- .../Infrastructure/DirectoryTransferTest.cs | 14 ++++++- .../src/LocalFileStorageResource.cs | 1 + .../src/StreamToUriJobPart.cs | 38 ++++++++++--------- 3 files changed, 33 insertions(+), 20 deletions(-) diff --git a/sdk/storage/Azure.Storage.DataMovement.Blobs/perf/Azure.Storage.DataMovement.Blobs.Perf/Infrastructure/DirectoryTransferTest.cs b/sdk/storage/Azure.Storage.DataMovement.Blobs/perf/Azure.Storage.DataMovement.Blobs.Perf/Infrastructure/DirectoryTransferTest.cs index e136797bc2498..2713314446468 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Blobs/perf/Azure.Storage.DataMovement.Blobs.Perf/Infrastructure/DirectoryTransferTest.cs +++ b/sdk/storage/Azure.Storage.DataMovement.Blobs/perf/Azure.Storage.DataMovement.Blobs.Perf/Infrastructure/DirectoryTransferTest.cs @@ -91,8 +91,18 @@ protected async Task RunAndVerifyTransferAsync( DataTransfer transfer = await transferManager.StartTransferAsync( source, destination, options, cancellationToken); - await transfer.WaitForCompletionAsync(cancellationToken); - if (!transfer.TransferStatus.HasCompletedSuccessfully) + // The test runs for a specified duration and then cancels the token. + // When canceled, pause the currently running transfer so it can be + // cleaned up. + cancellationToken.Register(async () => + { + // Don't pass cancellation token since its already cancelled. + await transfer.PauseAsync(); + }); + await transfer.WaitForCompletionAsync(); + + if (!transfer.TransferStatus.HasCompletedSuccessfully && + transfer.TransferStatus.State != DataTransferState.Paused) { throw new Exception("A failure occurred during the transfer."); } diff --git a/sdk/storage/Azure.Storage.DataMovement/src/LocalFileStorageResource.cs b/sdk/storage/Azure.Storage.DataMovement/src/LocalFileStorageResource.cs index bc8e208ec3922..fc64e3b1d6374 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/LocalFileStorageResource.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/LocalFileStorageResource.cs @@ -84,6 +84,7 @@ protected internal override Task ReadStreamAsyn long? length = default, CancellationToken cancellationToken = default) { + CancellationHelper.ThrowIfCancellationRequested(cancellationToken); FileStream stream = new FileStream(_uri.LocalPath, FileMode.Open, FileAccess.Read); stream.Position = position; return Task.FromResult(new StorageResourceReadStreamResult(stream)); diff --git a/sdk/storage/Azure.Storage.DataMovement/src/StreamToUriJobPart.cs b/sdk/storage/Azure.Storage.DataMovement/src/StreamToUriJobPart.cs index 6793937cf3012..39197042075f0 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/StreamToUriJobPart.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/StreamToUriJobPart.cs @@ -308,8 +308,9 @@ private async Task InitialUploadCall( StorageResourceReadStreamResult result = await _sourceResource.ReadStreamAsync( cancellationToken: _cancellationToken).ConfigureAwait(false); - using Stream stream = result.Content; - await _destinationResource.CopyFromStreamAsync( + using (Stream stream = result.Content) + { + await _destinationResource.CopyFromStreamAsync( stream: stream, overwrite: _createMode == StorageResourceCreationPreference.OverwriteIfExists, streamLength: blockSize, @@ -319,6 +320,7 @@ await _destinationResource.CopyFromStreamAsync( SourceProperties = sourceProperties }, cancellationToken: _cancellationToken).ConfigureAwait(false); + } // Report bytes written before completion ReportBytesWritten(blockSize); @@ -328,19 +330,19 @@ await _destinationResource.CopyFromStreamAsync( } else { - Stream slicedStream = Stream.Null; StorageResourceReadStreamResult result = await _sourceResource.ReadStreamAsync( position: 0, length: blockSize, cancellationToken: _cancellationToken).ConfigureAwait(false); - using (Stream stream = result.Content) + + using (Stream contentStream = result.Content) + using (Stream slicedStream = await GetOffsetPartitionInternal( + contentStream, + 0L, + blockSize, + UploadArrayPool, + _cancellationToken).ConfigureAwait(false)) { - slicedStream = await GetOffsetPartitionInternal( - stream, - 0L, - blockSize, - UploadArrayPool, - _cancellationToken).ConfigureAwait(false); await _destinationResource.CopyFromStreamAsync( stream: slicedStream, streamLength: blockSize, @@ -394,19 +396,19 @@ internal async Task StageBlockInternal( { try { - Stream slicedStream = Stream.Null; StorageResourceReadStreamResult result = await _sourceResource.ReadStreamAsync( position: offset, length: blockLength, cancellationToken: _cancellationToken).ConfigureAwait(false); - using (Stream stream = result.Content) + + using (Stream contentStream = result.Content) + using (Stream slicedStream = await GetOffsetPartitionInternal( + contentStream, + offset, + blockLength, + UploadArrayPool, + _cancellationToken).ConfigureAwait(false)) { - slicedStream = await GetOffsetPartitionInternal( - stream, - offset, - blockLength, - UploadArrayPool, - _cancellationToken).ConfigureAwait(false); await _destinationResource.CopyFromStreamAsync( stream: slicedStream, streamLength: blockLength, From b9040b200aec6796f6dbf551c54578680cd2d85b Mon Sep 17 00:00:00 2001 From: Jacob Lauzon Date: Tue, 8 Oct 2024 17:18:52 -0700 Subject: [PATCH 2/2] More fixes --- .../Infrastructure/DirectoryTransferTest.cs | 14 +++++++++++--- .../src/JobPartInternal.cs | 1 + 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/sdk/storage/Azure.Storage.DataMovement.Blobs/perf/Azure.Storage.DataMovement.Blobs.Perf/Infrastructure/DirectoryTransferTest.cs b/sdk/storage/Azure.Storage.DataMovement.Blobs/perf/Azure.Storage.DataMovement.Blobs.Perf/Infrastructure/DirectoryTransferTest.cs index 2713314446468..fc6e09eec7776 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Blobs/perf/Azure.Storage.DataMovement.Blobs.Perf/Infrastructure/DirectoryTransferTest.cs +++ b/sdk/storage/Azure.Storage.DataMovement.Blobs/perf/Azure.Storage.DataMovement.Blobs.Perf/Infrastructure/DirectoryTransferTest.cs @@ -18,6 +18,7 @@ public abstract class DirectoryTransferTest : PerfTest where protected BlobServiceClient BlobServiceClient { get; } protected LocalFilesStorageResourceProvider LocalFileResourceProvider { get; } protected BlobsStorageResourceProvider BlobResourceProvider { get; } + private TimeSpan _transferTimeout; public DirectoryTransferTest(TOptions options) : base(options) { @@ -25,6 +26,7 @@ public DirectoryTransferTest(TOptions options) : base(options) BlobServiceClient = new BlobServiceClient(PerfTestEnvironment.Instance.BlobStorageEndpoint, PerfTestEnvironment.Instance.Credential); LocalFileResourceProvider = new LocalFilesStorageResourceProvider(); BlobResourceProvider = new BlobsStorageResourceProvider(PerfTestEnvironment.Instance.Credential); + _transferTimeout = TimeSpan.FromSeconds(5 + (Options.Count * Options.Size) / (1 * 1024 * 1024)); } protected string CreateLocalDirectory(bool populate = false) @@ -92,14 +94,20 @@ protected async Task RunAndVerifyTransferAsync( source, destination, options, cancellationToken); // The test runs for a specified duration and then cancels the token. - // When canceled, pause the currently running transfer so it can be - // cleaned up. + // When canceled, pause the currently running transfer so it can be cleaned up. cancellationToken.Register(async () => { // Don't pass cancellation token since its already cancelled. await transfer.PauseAsync(); }); - await transfer.WaitForCompletionAsync(); + + // The cancellation token we specify for WaitForCompletion should not + // be the one passed to the test as we don't want this code to exit until + // the transfer is complete or paused so it can be properly cleaned up. + // However, we pass a token with a generous time to prevent the transfer + // from hanging forever if there is an issue. + CancellationTokenSource ctx = new(_transferTimeout); + await transfer.WaitForCompletionAsync(ctx.Token); if (!transfer.TransferStatus.HasCompletedSuccessfully && transfer.TransferStatus.State != DataTransferState.Paused) diff --git a/sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs b/sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs index a965c3b1edf71..29d356e1c54f6 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs @@ -379,6 +379,7 @@ public async virtual Task InvokeFailedArg(Exception ex) { if (ex is not OperationCanceledException && ex is not TaskCanceledException && + ex.InnerException is not TaskCanceledException && !ex.Message.Contains("The request was canceled.")) { if (ex is RequestFailedException requestFailedException)