diff --git a/sdk/storage/Azure.Storage.DataMovement/src/TransferManager.cs b/sdk/storage/Azure.Storage.DataMovement/src/TransferManager.cs index f3a1c5aa0e2ab..db68b79157743 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/TransferManager.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/TransferManager.cs @@ -356,24 +356,38 @@ public virtual async Task StartTransferAsync( transferOptions ??= new DataTransferOptions(); string transferId = _generateTransferId(); - await _checkpointer.AddNewJobAsync( - transferId, - sourceResource, - destinationResource, - cancellationToken).ConfigureAwait(false); - - // TODO: if the below fails for any reason, this job will still be in the checkpointer. - // That seems not desirable. - - DataTransfer dataTransfer = await BuildAndAddTransferJobAsync( - sourceResource, - destinationResource, - transferOptions, - transferId, - false, - cancellationToken).ConfigureAwait(false); - - return dataTransfer; + try + { + await _checkpointer.AddNewJobAsync( + transferId, + sourceResource, + destinationResource, + cancellationToken).ConfigureAwait(false); + + DataTransfer dataTransfer = await BuildAndAddTransferJobAsync( + sourceResource, + destinationResource, + transferOptions, + transferId, + false, + cancellationToken).ConfigureAwait(false); + + return dataTransfer; + } + catch (Exception ex) + { + // cleanup any state for a job that didn't even start + try + { + _dataTransfers.Remove(transferId); + await _checkpointer.TryRemoveStoredTransferAsync(transferId, cancellationToken).ConfigureAwait(false); + } + catch (Exception cleanupEx) + { + throw new AggregateException(ex, cleanupEx); + } + throw; + } } private async Task BuildAndAddTransferJobAsync( diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/TransferManagerTests.cs b/sdk/storage/Azure.Storage.DataMovement/tests/TransferManagerTests.cs index 18d972f649cc3..0181a0221a567 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/TransferManagerTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/TransferManagerTests.cs @@ -14,6 +14,7 @@ using Azure.Storage.DataMovement.Tests.Shared; using Moq; using NUnit.Framework; +using NUnit.Framework.Constraints; namespace Azure.Storage.DataMovement.Tests; @@ -285,6 +286,7 @@ public async Task BasicContainerTransfer( [Combinatorial] public async Task TransferFailAtQueue( [Values(0, 1)] int failAt, + [Values(true, false)] bool throwCleanup, [Values(true, false)] bool isContainer) { Uri srcUri = new("file:///foo/bar"); @@ -301,19 +303,34 @@ public async Task TransferFailAtQueue( = GetBasicSetupResources(isContainer, srcUri, dstUri); Exception expectedException = new(); - switch (failAt) + Exception cleanupException = throwCleanup ? new() : null; + List capturedTransferIds = new(); { - case 0: - jobBuilder.Setup(b => b.BuildJobAsync(It.IsAny(), It.IsAny(), - It.IsAny(), It.IsAny(), It.IsAny(), - It.IsAny(), It.IsAny()) - ).Throws(expectedException); - break; - case 1: - checkpointer.Setup(c => c.AddNewJobAsync(It.IsAny(), It.IsAny(), - It.IsAny(), It.IsAny()) - ).Throws(expectedException); - break; + var checkpointerAddJob = checkpointer.Setup(c => c.AddNewJobAsync(Capture.In(capturedTransferIds), + It.IsAny(), It.IsAny(), It.IsAny())); + var checkpointerRemoveJob = checkpointer.Setup(c => c.TryRemoveStoredTransferAsync( + It.IsAny(), It.IsAny())); + + switch (failAt) + { + case 0: + jobBuilder.Setup(b => b.BuildJobAsync(It.IsAny(), It.IsAny(), + It.IsAny(), It.IsAny(), It.IsAny(), + It.IsAny(), It.IsAny()) + ).Throws(expectedException); + break; + case 1: + checkpointerAddJob.Throws(expectedException); + break; + } + if (throwCleanup) + { + checkpointerRemoveJob.Throws(cleanupException); + } + else + { + checkpointerRemoveJob.Returns(Task.FromResult(true)); + } } await using TransferManager transferManager = new( @@ -325,15 +342,21 @@ public async Task TransferFailAtQueue( default); DataTransfer transfer = null; - - Assert.That(async () => transfer = await transferManager.StartTransferAsync( - srcResource, - dstResource), Throws.Exception.EqualTo(expectedException)); + IConstraint throwsConstraint = throwCleanup + ? Throws.TypeOf().And.Property(nameof(AggregateException.InnerExceptions)) + .EquivalentTo(new List() { expectedException, cleanupException }) + : Throws.Exception.EqualTo(expectedException); + Assert.That(async () => transfer = await transferManager.StartTransferAsync(srcResource, dstResource), + throwsConstraint); Assert.That(transfer, Is.Null); - // TODO determine if checkpointer still has the job tracked even though it failed to queue (it shouldn't) - // need checkpointer API refactor for this + Assert.That(capturedTransferIds.Count, Is.EqualTo(1)); + checkpointer.Verify(c => c.AddNewJobAsync(capturedTransferIds.First(), It.IsAny(), + It.IsAny(), It.IsAny()), Times.Once); + checkpointer.Verify(c => c.TryRemoveStoredTransferAsync(capturedTransferIds.First(), + It.IsAny()), Times.Once); + checkpointer.VerifyNoOtherCalls(); } [Test]