Skip to content

Commit

Permalink
DataMovement unify TransferAndVerify Part 1 (#39300)
Browse files Browse the repository at this point in the history
* initial unified implementation

* applied to blob single download tests

* move local tooling

* checkpoint

* separate out blob utilities

* separating item and container test util

* cancellation tokens

* using single blob

* apply to append tests

* added page blobs

* remove old verify methods
  • Loading branch information
jaschrep-msft authored Oct 18, 2023
1 parent d13cc6b commit 62c83cc
Show file tree
Hide file tree
Showing 5 changed files with 493 additions and 447 deletions.
2 changes: 1 addition & 1 deletion sdk/storage/Azure.Storage.DataMovement/assets.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
"AssetsRepo": "Azure/azure-sdk-assets",
"AssetsRepoPrefixPath": "net",
"TagPrefix": "net/storage/Azure.Storage.DataMovement",
"Tag": "net/storage/Azure.Storage.DataMovement_bcc0ae9f03"
"Tag": "net/storage/Azure.Storage.DataMovement_0682ba876f"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace Azure.Storage.DataMovement.Tests
{
internal partial class TransferValidator
{
public class LocalFileResourceEnumerationItem : IResourceEnumerationItem
{
private readonly string _localFilePath;

public string RelativePath { get; }

public LocalFileResourceEnumerationItem(string localFilePath, string relativePath)
{
_localFilePath = localFilePath;
RelativePath = relativePath;
}

public Task<Stream> OpenReadAsync(CancellationToken cancellationToken)
{
return Task.FromResult(File.OpenRead(_localFilePath) as Stream);
}
}

public static ListFilesAsync GetLocalFileLister(string directoryPath)
{
Task<List<IResourceEnumerationItem>> ListFiles(CancellationToken cancellationToken)
{
List<IResourceEnumerationItem> result = new();
Queue<string> directories = new();
directories.Enqueue(directoryPath);
while (directories.Count > 0)
{
string workingDir = directories.Dequeue();
foreach (string dirPath in Directory.GetDirectories(workingDir))
{
directories.Enqueue(dirPath);
}
foreach (string filePath in Directory.GetFiles(workingDir))
{
result.Add(new LocalFileResourceEnumerationItem(filePath, filePath.Substring(workingDir.Length)));
}
}
return Task.FromResult(result);
}
return ListFiles;
}

public static ListFilesAsync GetLocalFileListerSingle(string filePath, string relativePath)
{
Task<List<IResourceEnumerationItem>> ListFiles(CancellationToken cancellationToken)
{
return Task.FromResult(new List<IResourceEnumerationItem>()
{
new LocalFileResourceEnumerationItem(filePath, relativePath)
});
}
return ListFiles;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using NUnit.Framework;

namespace Azure.Storage.DataMovement.Tests
{
internal partial class TransferValidator
{
public interface IResourceEnumerationItem
{
string RelativePath { get; }
Task<Stream> OpenReadAsync(CancellationToken cancellationToken);
}

public delegate Task<List<IResourceEnumerationItem>> ListFilesAsync(CancellationToken cancellationToken);

public TransferManager TransferManager { get; init; } = new(new TransferManagerOptions()
{
ErrorHandling = DataTransferErrorMode.ContinueOnFailure
});

public async Task TransferAndVerifyAsync(
StorageResourceContainer sourceResource,
StorageResourceContainer destinationResource,
ListFilesAsync getSourceFilesAsync,
ListFilesAsync getDestinationFilesAsync,
int expectedItemTransferCount,
DataTransferOptions options = default,
CancellationToken cancellationToken = default)
{
options ??= new DataTransferOptions();
TestEventsRaised testEventsRaised = new TestEventsRaised(options);

DataTransfer transfer = await TransferManager.StartTransferAsync(
sourceResource,
destinationResource,
options,
cancellationToken);
await transfer.WaitForCompletionAsync(cancellationToken);

await testEventsRaised.AssertContainerCompletedCheck(expectedItemTransferCount);
Assert.IsTrue(transfer.HasCompleted);
Assert.AreEqual(DataTransferState.Completed, transfer.TransferStatus.State);

List<IResourceEnumerationItem> sourceFiles = await getSourceFilesAsync(cancellationToken);
List<IResourceEnumerationItem> destinationFiles = await getDestinationFilesAsync(cancellationToken);

Assert.That(sourceFiles.Count, Is.EqualTo(expectedItemTransferCount));
Assert.That(destinationFiles.Count, Is.EqualTo(expectedItemTransferCount));

sourceFiles.Sort((left, right) => left.RelativePath.CompareTo(right.RelativePath));
destinationFiles.Sort((left, right) => left.RelativePath.CompareTo(right.RelativePath));

foreach (int i in Enumerable.Range(0, expectedItemTransferCount))
{
using Stream sourceStream = await sourceFiles[i].OpenReadAsync(cancellationToken);
using Stream destinationStream = await destinationFiles[i].OpenReadAsync(cancellationToken);
Assert.That(sourceStream, Is.EqualTo(destinationStream));
}
}

public async Task TransferAndVerifyAsync(
StorageResourceItem sourceResource,
StorageResourceItem destinationResource,
Func<CancellationToken, Task<Stream>> openReadSourceAsync,
Func<CancellationToken, Task<Stream>> openReadDestinationAsync,
DataTransferOptions options = default,
CancellationToken cancellationToken = default)
{
DataTransfer transfer = await TransferManager.StartTransferAsync(
sourceResource,
destinationResource,
options,
cancellationToken);
await transfer.WaitForCompletionAsync(cancellationToken);

using Stream sourceStream = await openReadSourceAsync(cancellationToken);
using Stream destinationStream = await openReadDestinationAsync(cancellationToken);
Assert.That(sourceStream, Is.EqualTo(destinationStream));
}
}
}
Loading

0 comments on commit 62c83cc

Please sign in to comment.