From 7343ed9a8ca3d53cbfa2ae0e0e15b119dc27b837 Mon Sep 17 00:00:00 2001 From: Jacob Lauzon <96087589+jalauzon-msft@users.noreply.github.com> Date: Mon, 27 Nov 2023 15:13:13 -0800 Subject: [PATCH] [Storage][DataMovement] Various fixes to address Live test failures (#40107) --- .../assets.json | 2 +- .../src/JobPartInternal.cs | 5 +- .../src/ServiceToServiceTransferJob.cs | 11 +- .../src/StreamToUriJobPart.cs | 104 ++++++++---------- .../src/StreamToUriTransferJob.cs | 11 +- .../src/TransferJobInternal.cs | 4 +- .../StartTransferUploadDirectoryTestBase.cs | 12 +- 7 files changed, 80 insertions(+), 69 deletions(-) diff --git a/sdk/storage/Azure.Storage.DataMovement.Files.Shares/assets.json b/sdk/storage/Azure.Storage.DataMovement.Files.Shares/assets.json index 4588fe9c7ea19..3234e0634c826 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Files.Shares/assets.json +++ b/sdk/storage/Azure.Storage.DataMovement.Files.Shares/assets.json @@ -2,5 +2,5 @@ "AssetsRepo": "Azure/azure-sdk-assets", "AssetsRepoPrefixPath": "net", "TagPrefix": "net/storage/Azure.Storage.DataMovement.Files.Shares", - "Tag": "net/storage/Azure.Storage.DataMovement.Files.Shares_330a0b2660" + "Tag": "net/storage/Azure.Storage.DataMovement.Files.Shares_e43c13a85f" } diff --git a/sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs b/sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs index 8254018aafac3..a2525a5cbd838 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs @@ -378,8 +378,9 @@ await PartTransferStatusEventHandler.RaiseAsync( /// public async virtual Task InvokeFailedArg(Exception ex) { - if (ex is not OperationCanceledException - && ex is not TaskCanceledException) + if (ex is not OperationCanceledException && + ex is not TaskCanceledException && + !ex.Message.Contains("The request was canceled.")) { SetFailureType(ex.Message); if (TransferFailedEventHandler != null) diff --git a/sdk/storage/Azure.Storage.DataMovement/src/ServiceToServiceTransferJob.cs b/sdk/storage/Azure.Storage.DataMovement/src/ServiceToServiceTransferJob.cs index 733a1179880e4..124100c6baa9f 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/ServiceToServiceTransferJob.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/ServiceToServiceTransferJob.cs @@ -181,7 +181,16 @@ private async IAsyncEnumerable GetStorageResourcesAsync() : current.Uri.GetPath().Substring(containerUriPath.Length + 1); StorageResourceContainer subContainer = _destinationResourceContainer.GetChildStorageResourceContainer(subContainerPath); - await subContainer.CreateIfNotExistsAsync().ConfigureAwait(false); + + try + { + await subContainer.CreateIfNotExistsAsync(_cancellationToken).ConfigureAwait(false); + } + catch (Exception ex) + { + await InvokeFailedArgAsync(ex).ConfigureAwait(false); + yield break; + } } else { diff --git a/sdk/storage/Azure.Storage.DataMovement/src/StreamToUriJobPart.cs b/sdk/storage/Azure.Storage.DataMovement/src/StreamToUriJobPart.cs index 3ee3be33bb511..0f9bdeb2bc0f9 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/StreamToUriJobPart.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/StreamToUriJobPart.cs @@ -252,12 +252,21 @@ private async Task CreateDestinationResource(long blockSize, long length, // Whether or not we continue is up to whether this was single put call or not. return !singleCall; } - catch (RequestFailedException exception) - when (_createMode == StorageResourceCreationPreference.SkipIfExists - && exception.ErrorCode == "BlobAlreadyExists") + catch (RequestFailedException r) + when (r.ErrorCode == "BlobAlreadyExists" && _createMode == StorageResourceCreationPreference.SkipIfExists) { await InvokeSkippedArg().ConfigureAwait(false); } + catch (InvalidOperationException i) + when (i.Message.Contains("Cannot overwrite file.") && _createMode == StorageResourceCreationPreference.SkipIfExists) + { + await InvokeSkippedArg().ConfigureAwait(false); + } + catch (Exception ex) + { + await InvokeFailedArg(ex).ConfigureAwait(false); + } + // Do not continue if we need to skip or there was an error. return false; } @@ -268,66 +277,49 @@ private async Task CreateDestinationResource(long blockSize, long length, /// private async Task InitialUploadCall(long blockSize, long expectedLength, bool singleCall) { - try + if (singleCall) { - if (singleCall) - { - StorageResourceReadStreamResult result = await _sourceResource.ReadStreamAsync( - cancellationToken: _cancellationToken).ConfigureAwait(false); + StorageResourceReadStreamResult result = await _sourceResource.ReadStreamAsync( + cancellationToken: _cancellationToken).ConfigureAwait(false); - using Stream stream = result.Content; - await _destinationResource.CopyFromStreamAsync( - stream: stream, - overwrite: _createMode == StorageResourceCreationPreference.OverwriteIfExists, - streamLength: blockSize, - completeLength: expectedLength, - cancellationToken: _cancellationToken).ConfigureAwait(false); + using Stream stream = result.Content; + await _destinationResource.CopyFromStreamAsync( + stream: stream, + overwrite: _createMode == StorageResourceCreationPreference.OverwriteIfExists, + streamLength: blockSize, + completeLength: expectedLength, + cancellationToken: _cancellationToken).ConfigureAwait(false); - // Report bytes written before completion - ReportBytesWritten(blockSize); + // Report bytes written before completion + ReportBytesWritten(blockSize); - // Set completion status to completed - await OnTransferStateChangedAsync(DataTransferState.Completed).ConfigureAwait(false); - } - else + // Set completion status to completed + await OnTransferStateChangedAsync(DataTransferState.Completed).ConfigureAwait(false); + } + else + { + Stream slicedStream = Stream.Null; + StorageResourceReadStreamResult result = await _sourceResource.ReadStreamAsync( + position: 0, + length: blockSize, + cancellationToken: _cancellationToken).ConfigureAwait(false); + using (Stream stream = result.Content) { - Stream slicedStream = Stream.Null; - StorageResourceReadStreamResult result = await _sourceResource.ReadStreamAsync( - position: 0, - length: blockSize, + slicedStream = await GetOffsetPartitionInternal( + stream, + 0L, + blockSize, + UploadArrayPool, + _cancellationToken).ConfigureAwait(false); + await _destinationResource.CopyFromStreamAsync( + stream: slicedStream, + streamLength: blockSize, + overwrite: _createMode == StorageResourceCreationPreference.OverwriteIfExists, + completeLength: expectedLength, cancellationToken: _cancellationToken).ConfigureAwait(false); - using (Stream stream = result.Content) - { - slicedStream = await GetOffsetPartitionInternal( - stream, - 0L, - blockSize, - UploadArrayPool, - _cancellationToken).ConfigureAwait(false); - await _destinationResource.CopyFromStreamAsync( - stream: slicedStream, - streamLength: blockSize, - overwrite: _createMode == StorageResourceCreationPreference.OverwriteIfExists, - completeLength: expectedLength, - cancellationToken: _cancellationToken).ConfigureAwait(false); - } - - ReportBytesWritten(blockSize); } - } - catch (RequestFailedException ex) - when (ex.ErrorCode == "BlobAlreadyExists" && _createMode == StorageResourceCreationPreference.SkipIfExists) - { - await InvokeSkippedArg().ConfigureAwait(false); - } - catch (InvalidOperationException ex) - when (ex.Message.Contains("Cannot overwrite file.") && _createMode == StorageResourceCreationPreference.SkipIfExists) - { - await InvokeSkippedArg().ConfigureAwait(false); - } - catch (Exception ex) - { - await InvokeFailedArg(ex).ConfigureAwait(false); + + ReportBytesWritten(blockSize); } } diff --git a/sdk/storage/Azure.Storage.DataMovement/src/StreamToUriTransferJob.cs b/sdk/storage/Azure.Storage.DataMovement/src/StreamToUriTransferJob.cs index 2ca04b94c144b..bb966ad904483 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/StreamToUriTransferJob.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/StreamToUriTransferJob.cs @@ -179,7 +179,16 @@ private async IAsyncEnumerable GetStorageResourcesAsync() : current.Uri.GetPath().Substring(containerUriPath.Length + 1); StorageResourceContainer subContainer = _destinationResourceContainer.GetChildStorageResourceContainer(subContainerPath); - await subContainer.CreateIfNotExistsAsync().ConfigureAwait(false); + + try + { + await subContainer.CreateIfNotExistsAsync(_cancellationToken).ConfigureAwait(false); + } + catch (Exception ex) + { + await InvokeFailedArgAsync(ex).ConfigureAwait(false); + yield break; + } } else { diff --git a/sdk/storage/Azure.Storage.DataMovement/src/TransferJobInternal.cs b/sdk/storage/Azure.Storage.DataMovement/src/TransferJobInternal.cs index ec99f39ae8bce..b3abb47c44e56 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/TransferJobInternal.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/TransferJobInternal.cs @@ -287,7 +287,9 @@ public async Task TriggerJobCancellationAsync() /// public async virtual Task InvokeFailedArgAsync(Exception ex) { - if (ex is not OperationCanceledException) + if (ex is not OperationCanceledException && + ex is not TaskCanceledException && + !ex.Message.Contains("The request was canceled.")) { if (TransferFailedEventHandler != null) { diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/Shared/StartTransferUploadDirectoryTestBase.cs b/sdk/storage/Azure.Storage.DataMovement/tests/Shared/StartTransferUploadDirectoryTestBase.cs index 0fdb9b1bd0a52..dec2ba0363cc0 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/Shared/StartTransferUploadDirectoryTestBase.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/Shared/StartTransferUploadDirectoryTestBase.cs @@ -175,7 +175,6 @@ public async Task Upload(long objectSize, int waitTimeInSec) GetNewObjectName(), GetNewObjectName(), }; - Console.WriteLine($"files: {string.Join(", ", files)}"); CancellationToken cancellationToken = TestHelper.GetTimeoutToken(waitTimeInSec); await SetupDirectoryAsync( @@ -194,7 +193,7 @@ await UploadDirectoryAndVerifyAsync( [TestCase(DataTransferErrorMode.StopOnAnyFailure)] public async Task UploadFailIfExists(DataTransferErrorMode errorMode) { - const int waitTimeInSec = 5; + const int waitTimeInSec = 10; const int preexistingFileCount = 2; const int skipCount = 1; const int totalFileCount = skipCount + preexistingFileCount + 1; @@ -260,7 +259,7 @@ await SetupDirectoryAsync( [Test] public async Task UploadSkipIfExists() { - const int waitTimeInSec = 5; + const int waitTimeInSec = 10; const int preexistingFileCount = 2; const int skipCount = 1; const int totalFileCount = skipCount + preexistingFileCount + 1; @@ -386,11 +385,10 @@ await UploadDirectoryAndVerifyAsync( } [RecordedTest] - [TestCase(1)] - [TestCase(5)] - public async Task UploadEmpty(int folderDepth) + [TestCase(1, 10)] + [TestCase(5, 30)] + public async Task UploadEmpty(int folderDepth, int waitTimeInSec) { - const int waitTimeInSec = 10; // Arrange using DisposingLocalDirectory disposingLocalDirectory = DisposingLocalDirectory.GetTestDirectory(); await using IDisposingContainer test = await GetDisposingContainerAsync();