Skip to content

Commit

Permalink
[Storage][DataMovement] Various fixes to address Live test failures (A…
Browse files Browse the repository at this point in the history
  • Loading branch information
jalauzon-msft authored Nov 27, 2023
1 parent 1e56406 commit 7343ed9
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
"AssetsRepo": "Azure/azure-sdk-assets",
"AssetsRepoPrefixPath": "net",
"TagPrefix": "net/storage/Azure.Storage.DataMovement.Files.Shares",
"Tag": "net/storage/Azure.Storage.DataMovement.Files.Shares_330a0b2660"
"Tag": "net/storage/Azure.Storage.DataMovement.Files.Shares_e43c13a85f"
}
5 changes: 3 additions & 2 deletions sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -378,8 +378,9 @@ await PartTransferStatusEventHandler.RaiseAsync(
/// </summary>
public async virtual Task InvokeFailedArg(Exception ex)
{
if (ex is not OperationCanceledException
&& ex is not TaskCanceledException)
if (ex is not OperationCanceledException &&
ex is not TaskCanceledException &&
!ex.Message.Contains("The request was canceled."))
{
SetFailureType(ex.Message);
if (TransferFailedEventHandler != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,16 @@ private async IAsyncEnumerable<JobPartInternal> GetStorageResourcesAsync()
: current.Uri.GetPath().Substring(containerUriPath.Length + 1);
StorageResourceContainer subContainer =
_destinationResourceContainer.GetChildStorageResourceContainer(subContainerPath);
await subContainer.CreateIfNotExistsAsync().ConfigureAwait(false);

try
{
await subContainer.CreateIfNotExistsAsync(_cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
await InvokeFailedArgAsync(ex).ConfigureAwait(false);
yield break;
}
}
else
{
Expand Down
104 changes: 48 additions & 56 deletions sdk/storage/Azure.Storage.DataMovement/src/StreamToUriJobPart.cs
Original file line number Diff line number Diff line change
Expand Up @@ -252,12 +252,21 @@ private async Task<bool> CreateDestinationResource(long blockSize, long length,
// Whether or not we continue is up to whether this was single put call or not.
return !singleCall;
}
catch (RequestFailedException exception)
when (_createMode == StorageResourceCreationPreference.SkipIfExists
&& exception.ErrorCode == "BlobAlreadyExists")
catch (RequestFailedException r)
when (r.ErrorCode == "BlobAlreadyExists" && _createMode == StorageResourceCreationPreference.SkipIfExists)
{
await InvokeSkippedArg().ConfigureAwait(false);
}
catch (InvalidOperationException i)
when (i.Message.Contains("Cannot overwrite file.") && _createMode == StorageResourceCreationPreference.SkipIfExists)
{
await InvokeSkippedArg().ConfigureAwait(false);
}
catch (Exception ex)
{
await InvokeFailedArg(ex).ConfigureAwait(false);
}

// Do not continue if we need to skip or there was an error.
return false;
}
Expand All @@ -268,66 +277,49 @@ private async Task<bool> CreateDestinationResource(long blockSize, long length,
/// </summary>
private async Task InitialUploadCall(long blockSize, long expectedLength, bool singleCall)
{
try
if (singleCall)
{
if (singleCall)
{
StorageResourceReadStreamResult result = await _sourceResource.ReadStreamAsync(
cancellationToken: _cancellationToken).ConfigureAwait(false);
StorageResourceReadStreamResult result = await _sourceResource.ReadStreamAsync(
cancellationToken: _cancellationToken).ConfigureAwait(false);

using Stream stream = result.Content;
await _destinationResource.CopyFromStreamAsync(
stream: stream,
overwrite: _createMode == StorageResourceCreationPreference.OverwriteIfExists,
streamLength: blockSize,
completeLength: expectedLength,
cancellationToken: _cancellationToken).ConfigureAwait(false);
using Stream stream = result.Content;
await _destinationResource.CopyFromStreamAsync(
stream: stream,
overwrite: _createMode == StorageResourceCreationPreference.OverwriteIfExists,
streamLength: blockSize,
completeLength: expectedLength,
cancellationToken: _cancellationToken).ConfigureAwait(false);

// Report bytes written before completion
ReportBytesWritten(blockSize);
// Report bytes written before completion
ReportBytesWritten(blockSize);

// Set completion status to completed
await OnTransferStateChangedAsync(DataTransferState.Completed).ConfigureAwait(false);
}
else
// Set completion status to completed
await OnTransferStateChangedAsync(DataTransferState.Completed).ConfigureAwait(false);
}
else
{
Stream slicedStream = Stream.Null;
StorageResourceReadStreamResult result = await _sourceResource.ReadStreamAsync(
position: 0,
length: blockSize,
cancellationToken: _cancellationToken).ConfigureAwait(false);
using (Stream stream = result.Content)
{
Stream slicedStream = Stream.Null;
StorageResourceReadStreamResult result = await _sourceResource.ReadStreamAsync(
position: 0,
length: blockSize,
slicedStream = await GetOffsetPartitionInternal(
stream,
0L,
blockSize,
UploadArrayPool,
_cancellationToken).ConfigureAwait(false);
await _destinationResource.CopyFromStreamAsync(
stream: slicedStream,
streamLength: blockSize,
overwrite: _createMode == StorageResourceCreationPreference.OverwriteIfExists,
completeLength: expectedLength,
cancellationToken: _cancellationToken).ConfigureAwait(false);
using (Stream stream = result.Content)
{
slicedStream = await GetOffsetPartitionInternal(
stream,
0L,
blockSize,
UploadArrayPool,
_cancellationToken).ConfigureAwait(false);
await _destinationResource.CopyFromStreamAsync(
stream: slicedStream,
streamLength: blockSize,
overwrite: _createMode == StorageResourceCreationPreference.OverwriteIfExists,
completeLength: expectedLength,
cancellationToken: _cancellationToken).ConfigureAwait(false);
}

ReportBytesWritten(blockSize);
}
}
catch (RequestFailedException ex)
when (ex.ErrorCode == "BlobAlreadyExists" && _createMode == StorageResourceCreationPreference.SkipIfExists)
{
await InvokeSkippedArg().ConfigureAwait(false);
}
catch (InvalidOperationException ex)
when (ex.Message.Contains("Cannot overwrite file.") && _createMode == StorageResourceCreationPreference.SkipIfExists)
{
await InvokeSkippedArg().ConfigureAwait(false);
}
catch (Exception ex)
{
await InvokeFailedArg(ex).ConfigureAwait(false);

ReportBytesWritten(blockSize);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,16 @@ private async IAsyncEnumerable<JobPartInternal> GetStorageResourcesAsync()
: current.Uri.GetPath().Substring(containerUriPath.Length + 1);
StorageResourceContainer subContainer =
_destinationResourceContainer.GetChildStorageResourceContainer(subContainerPath);
await subContainer.CreateIfNotExistsAsync().ConfigureAwait(false);

try
{
await subContainer.CreateIfNotExistsAsync(_cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
await InvokeFailedArgAsync(ex).ConfigureAwait(false);
yield break;
}
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,9 @@ public async Task TriggerJobCancellationAsync()
/// <returns></returns>
public async virtual Task InvokeFailedArgAsync(Exception ex)
{
if (ex is not OperationCanceledException)
if (ex is not OperationCanceledException &&
ex is not TaskCanceledException &&
!ex.Message.Contains("The request was canceled."))
{
if (TransferFailedEventHandler != null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ public async Task Upload(long objectSize, int waitTimeInSec)
GetNewObjectName(),
GetNewObjectName(),
};
Console.WriteLine($"files: {string.Join(", ", files)}");

CancellationToken cancellationToken = TestHelper.GetTimeoutToken(waitTimeInSec);
await SetupDirectoryAsync(
Expand All @@ -194,7 +193,7 @@ await UploadDirectoryAndVerifyAsync(
[TestCase(DataTransferErrorMode.StopOnAnyFailure)]
public async Task UploadFailIfExists(DataTransferErrorMode errorMode)
{
const int waitTimeInSec = 5;
const int waitTimeInSec = 10;
const int preexistingFileCount = 2;
const int skipCount = 1;
const int totalFileCount = skipCount + preexistingFileCount + 1;
Expand Down Expand Up @@ -260,7 +259,7 @@ await SetupDirectoryAsync(
[Test]
public async Task UploadSkipIfExists()
{
const int waitTimeInSec = 5;
const int waitTimeInSec = 10;
const int preexistingFileCount = 2;
const int skipCount = 1;
const int totalFileCount = skipCount + preexistingFileCount + 1;
Expand Down Expand Up @@ -386,11 +385,10 @@ await UploadDirectoryAndVerifyAsync(
}

[RecordedTest]
[TestCase(1)]
[TestCase(5)]
public async Task UploadEmpty(int folderDepth)
[TestCase(1, 10)]
[TestCase(5, 30)]
public async Task UploadEmpty(int folderDepth, int waitTimeInSec)
{
const int waitTimeInSec = 10;
// Arrange
using DisposingLocalDirectory disposingLocalDirectory = DisposingLocalDirectory.GetTestDirectory();
await using IDisposingContainer<TContainerClient> test = await GetDisposingContainerAsync();
Expand Down

0 comments on commit 7343ed9

Please sign in to comment.