Skip to content

Commit

Permalink
[Storage] Support upload 4TB Azure file
Browse files Browse the repository at this point in the history
  • Loading branch information
blueww committed Nov 13, 2020
1 parent 38898b9 commit 5740701
Show file tree
Hide file tree
Showing 12 changed files with 274 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public static class CustomAssemblyResolver
private static IDictionary<string, Version> NetFxPreloadAssemblies =
new Dictionary<string, Version>(StringComparer.InvariantCultureIgnoreCase)
{
{"Azure.Core", new Version("1.5.1.0")},
{"Azure.Core", new Version("1.6.0.0")},
{"Microsoft.Bcl.AsyncInterfaces", new Version("1.0.0.0")},
{"Microsoft.Identity.Client", new Version("4.21.0.0") },
{"Microsoft.Identity.Client.Extensions.Msal", new Version("2.16.2.0") },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Azure.Storage.Blobs" Version="12.7.0-preview.1" />
<PackageReference Include="Azure.Storage.Files.DataLake" Version="12.5.0-preview.1" />
<PackageReference Include="Azure.Storage.Files.Shares" Version="12.5.0-preview.1" />
<PackageReference Include="Azure.Storage.Queues" Version="12.5.0-preview.1" />
<PackageReference Include="Azure.Storage.Blobs" Version="12.7.0" />
<PackageReference Include="Azure.Storage.Files.DataLake" Version="12.5.0" />
<PackageReference Include="Azure.Storage.Files.Shares" Version="12.5.0" />
<PackageReference Include="Azure.Storage.Queues" Version="12.5.0" />
<PackageReference Include="Microsoft.Azure.Management.Storage" Version="17.2.0" />
</ItemGroup>

Expand Down
6 changes: 6 additions & 0 deletions src/Storage/Storage.Management/ChangeLog.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@
- Additional information about change #1
-->
## Upcoming Release
* Support upload Azure File size up to 4 TiB
- `Set-AzStorageFileContent`
* Upgraded Azure.Storage..Blobs to 12.7.0
* Upgraded Azure.Storage.Files.Shares to 12.5.0
* Upgraded Azure.Storage.Files.DataLake to 12.5.0
* Upgraded Azure.Storage.Queues to 12.5.0

## Version 3.0.0
* Removed obsolete property RestorePolicy.LastEnabledTime
Expand Down
2 changes: 1 addition & 1 deletion src/Storage/Storage/Common/AzureStorageFile.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public AzureStorageFile(CloudFile file, AzureStorageContext storageContext)
}

// Convert Track1 File object to Track 2 file Client
protected static ShareFileClient GetTrack2FileClient(CloudFile cloudFile, AzureStorageContext context)
public static ShareFileClient GetTrack2FileClient(CloudFile cloudFile, AzureStorageContext context)
{
ShareFileClient fileClient;
if (cloudFile.ServiceClient.Credentials.IsSAS) //SAS
Expand Down
83 changes: 83 additions & 0 deletions src/Storage/Storage/Common/Util.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ namespace Microsoft.WindowsAzure.Commands.Storage.Common
using Microsoft.Azure.Storage.File;
using XTable = Microsoft.Azure.Cosmos.Table;
using System;
using System.IO;
using System.Globalization;
using System.Net;
using System.Threading.Tasks;
Expand All @@ -29,6 +30,7 @@ namespace Microsoft.WindowsAzure.Commands.Storage.Common
using System.Collections;
using global::Azure.Storage.Blobs;
using global::Azure.Storage;
using global::Azure.Storage.Files.Shares.Models;

internal static class Util
{
Expand Down Expand Up @@ -596,5 +598,86 @@ public static void ValidateUserDelegationKeyStartEndTime(DateTimeOffset userDele
return null;
}
}

public static FileAttributes AzureFileNtfsAttributesToLocalAttributes(NtfsFileAttributes cloudFileNtfsAttributes)
{
FileAttributes fileAttributes = FileAttributes.Normal;

if ((cloudFileNtfsAttributes & NtfsFileAttributes.ReadOnly) == NtfsFileAttributes.ReadOnly)
fileAttributes |= FileAttributes.ReadOnly;

if ((cloudFileNtfsAttributes & NtfsFileAttributes.Hidden) == NtfsFileAttributes.Hidden)
fileAttributes |= FileAttributes.Hidden;

if ((cloudFileNtfsAttributes & NtfsFileAttributes.System) == NtfsFileAttributes.System)
fileAttributes |= FileAttributes.System;

if ((cloudFileNtfsAttributes & NtfsFileAttributes.Directory) == NtfsFileAttributes.Directory)
fileAttributes |= FileAttributes.Directory;

if ((cloudFileNtfsAttributes & NtfsFileAttributes.Archive) == NtfsFileAttributes.Archive)
fileAttributes |= FileAttributes.Archive;

if ((cloudFileNtfsAttributes & NtfsFileAttributes.Temporary) == NtfsFileAttributes.Temporary)
fileAttributes |= FileAttributes.Temporary;

if ((cloudFileNtfsAttributes & NtfsFileAttributes.Offline) == NtfsFileAttributes.Offline)
fileAttributes |= FileAttributes.Offline;

if ((cloudFileNtfsAttributes & NtfsFileAttributes.NotContentIndexed) == NtfsFileAttributes.NotContentIndexed)
fileAttributes |= FileAttributes.NotContentIndexed;

if ((cloudFileNtfsAttributes & NtfsFileAttributes.NoScrubData) == NtfsFileAttributes.NoScrubData)
fileAttributes |= FileAttributes.NoScrubData;

if ((cloudFileNtfsAttributes & NtfsFileAttributes.None) == NtfsFileAttributes.None)
{
if (fileAttributes != FileAttributes.Normal)
{
fileAttributes = fileAttributes & (~FileAttributes.Normal);
}
}

return fileAttributes;
}

public static NtfsFileAttributes LocalAttributesToAzureFileNtfsAttributes(FileAttributes fileAttributes)
{
NtfsFileAttributes cloudFileNtfsAttributes = NtfsFileAttributes.None;

if ((fileAttributes & FileAttributes.ReadOnly) == FileAttributes.ReadOnly)
cloudFileNtfsAttributes |= NtfsFileAttributes.ReadOnly;

if ((fileAttributes & FileAttributes.Hidden) == FileAttributes.Hidden)
cloudFileNtfsAttributes |= NtfsFileAttributes.Hidden;

if ((fileAttributes & FileAttributes.System) == FileAttributes.System)
cloudFileNtfsAttributes |= NtfsFileAttributes.System;

if ((fileAttributes & FileAttributes.Directory) == FileAttributes.Directory)
cloudFileNtfsAttributes |= NtfsFileAttributes.Directory;

if ((fileAttributes & FileAttributes.Archive) == FileAttributes.Archive)
cloudFileNtfsAttributes |= NtfsFileAttributes.Archive;

if ((fileAttributes & FileAttributes.Normal) == FileAttributes.Normal)
cloudFileNtfsAttributes |= NtfsFileAttributes.None;

if ((fileAttributes & FileAttributes.Temporary) == FileAttributes.Temporary)
cloudFileNtfsAttributes |= NtfsFileAttributes.Temporary;

if ((fileAttributes & FileAttributes.Offline) == FileAttributes.Offline)
cloudFileNtfsAttributes |= NtfsFileAttributes.Offline;

if ((fileAttributes & FileAttributes.NotContentIndexed) == FileAttributes.NotContentIndexed)
cloudFileNtfsAttributes |= NtfsFileAttributes.NotContentIndexed;

if ((fileAttributes & FileAttributes.NoScrubData) == FileAttributes.NoScrubData)
cloudFileNtfsAttributes |= NtfsFileAttributes.NoScrubData;

if (cloudFileNtfsAttributes != NtfsFileAttributes.None) cloudFileNtfsAttributes &= (~NtfsFileAttributes.None);

return cloudFileNtfsAttributes;
}
}
}
193 changes: 169 additions & 24 deletions src/Storage/Storage/File/Cmdlet/SetAzureStorageFileContent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,26 @@

namespace Microsoft.WindowsAzure.Commands.Storage.File.Cmdlet
{
using global::Azure;
using global::Azure.Storage.Files.Shares;
using global::Azure.Storage.Files.Shares.Models;
using Microsoft.Azure.Storage;
using Microsoft.Azure.Storage.DataMovement;
using Microsoft.Azure.Storage.File;
using Microsoft.WindowsAzure.Commands.Common;
using Microsoft.WindowsAzure.Commands.Common.Storage.ResourceModel;
using Microsoft.WindowsAzure.Commands.Storage.Common;
using Microsoft.WindowsAzure.Commands.Utilities.Common;
using Microsoft.Azure.Storage;
using Microsoft.Azure.Storage.File;
using System;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using System.Management.Automation;
using System.Net;
using System.Runtime.InteropServices;
using System.Security.Cryptography;
using System.Threading.Tasks;
using LocalConstants = Microsoft.WindowsAzure.Commands.Storage.File.Constants;
using System.Runtime.InteropServices;
using Microsoft.Azure.Storage.DataMovement;
using Microsoft.WindowsAzure.Commands.Common.CustomAttributes;
using Microsoft.WindowsAzure.Commands.Common.Storage.ResourceModel;

[Cmdlet("Set", Azure.Commands.ResourceManager.Common.AzureRMConstants.AzurePrefix + "StorageFileContent", SupportsShouldProcess = true, DefaultParameterSetName = LocalConstants.ShareNameParameterSetName), OutputType(typeof(AzureStorageFile))]
public class SetAzureStorageFileContent : StorageFileDataManagementCmdletBase, IDynamicParameters
Expand Down Expand Up @@ -110,11 +114,19 @@ public override void ExecuteCmdlet()
{
throw new FileNotFoundException(string.Format(CultureInfo.CurrentCulture, Resources.SourceFileNotFound, this.Source));
}
long fileSize = localFile.Length;

// if FIPS policy is enabled, must use native MD5
// if FIPS policy is enabled, must use native MD5 for DMlib.
if (fipsEnabled)
{
CloudStorageAccount.UseV1MD5 = false;
if (fileSize < sizeTB)
{
CloudStorageAccount.UseV1MD5 = false;
}
else // use Track2 SDK
{
WriteWarning("The uploaded file won't have Content MD5 hash, since caculate MD5 hash fail, most possiblly caused by FIPS is enabled on this machine.");
}
}

bool isDirectory;
Expand All @@ -128,24 +140,146 @@ public override void ExecuteCmdlet()
this.RunTask(async taskId =>
{
var progressRecord = new ProgressRecord(
this.OutputStream.GetProgressId(taskId),
string.Format(CultureInfo.CurrentCulture, Resources.SendAzureFileActivity, localFile.Name,
cloudFileToBeUploaded.GetFullPath(), cloudFileToBeUploaded.Share.Name),
Resources.PrepareUploadingFile);

await DataMovementTransferHelper.DoTransfer(() =>
this.TransferManager.UploadAsync(
localFile.FullName,
cloudFileToBeUploaded,
new UploadOptions
this.OutputStream.GetProgressId(taskId),
string.Format(CultureInfo.CurrentCulture, Resources.SendAzureFileActivity, localFile.Name,
cloudFileToBeUploaded.GetFullPath(), cloudFileToBeUploaded.Share.Name),
Resources.PrepareUploadingFile);

if (fileSize <= sizeTB)
{


await DataMovementTransferHelper.DoTransfer(() =>
this.TransferManager.UploadAsync(
localFile.FullName,
cloudFileToBeUploaded,
new UploadOptions
{
PreserveSMBAttributes = context is null ? false : context.PreserveSMBAttribute.IsPresent
},
this.GetTransferContext(progressRecord, localFile.Length),
this.CmdletCancellationToken),
progressRecord,
this.OutputStream).ConfigureAwait(false);
}
else // use Track2 SDK
{
//Create File
ShareFileClient fileClient = AzureStorageFile.GetTrack2FileClient(cloudFileToBeUploaded, Channel.StorageContext);

// confirm overwrite if file exist
if(!this.Force.IsPresent &&
fileClient.Exists(this.CmdletCancellationToken) &&
!await this.OutputStream.ConfirmAsync(string.Format(CultureInfo.CurrentCulture, Resources.OverwriteConfirmation, Util.ConvertToString(cloudFileToBeUploaded))))
{
return;
}

await fileClient.CreateAsync(fileSize, cancellationToken: this.CmdletCancellationToken).ConfigureAwait(false);

//Prepare progress Handler
IProgress<long> progressHandler = new Progress<long>((finishedBytes) =>
{
if (progressRecord != null)
{
// Size of the source file might be 0, when it is, directly treat the progress as 100 percent.
progressRecord.PercentComplete = 0 == fileSize ? 100 : (int)(finishedBytes * 100 / fileSize);
progressRecord.StatusDescription = string.Format(CultureInfo.CurrentCulture, Resources.FileTransmitStatus, progressRecord.PercentComplete);
this.OutputStream.WriteProgress(progressRecord);
}
});

long blockSize = 4 * 1024 * 1024;
int maxWorkers = 4;

List<Task> runningTasks = new List<Task>();

IncrementalHash hash = null;
if (!fipsEnabled)
{
hash = IncrementalHash.CreateHash(HashAlgorithmName.MD5);
}

using (FileStream stream = File.OpenRead(localFile.FullName))
{
byte[] buffer = null;
long lastBlockSize = 0;
for (long offset = 0; offset < fileSize; offset += blockSize)
{
long currentBlockSize = offset + blockSize < fileSize ? blockSize : fileSize - offset;

// Only need to create new buffer when chunk size change
if (currentBlockSize != lastBlockSize)
{
buffer = new byte[currentBlockSize];
lastBlockSize = currentBlockSize;
}
await stream.ReadAsync(buffer: buffer, offset: 0, count: (int)currentBlockSize);
if (!fipsEnabled && hash != null)
{
hash.AppendData(buffer);
}

Task task = UploadFileRangAsync(fileClient,
new HttpRange(offset, currentBlockSize),
new MemoryStream(buffer),
progressHandler);
runningTasks.Add(task);

// Check if any of upload range tasks are still busy
if (runningTasks.Count >= maxWorkers)
{
await Task.WhenAny(runningTasks).ConfigureAwait(false);

// Clear any completed blocks from the task list
for (int i = 0; i < runningTasks.Count; i++)
{
Task runningTask = runningTasks[i];
if (!runningTask.IsCompleted)
{
continue;
}

await runningTask.ConfigureAwait(false);
runningTasks.RemoveAt(i);
i--;
}
}
}
// Wait for all upload range tasks finished
await Task.WhenAll(runningTasks).ConfigureAwait(false);
}

// Need set file properties
if ((!fipsEnabled && hash != null) || (context != null && context.PreserveSMBAttribute.IsPresent))
{
ShareFileHttpHeaders header = null;
if (!fipsEnabled && hash != null)
{
header = new ShareFileHttpHeaders();
header.ContentHash = hash.GetHashAndReset();
}

FileSmbProperties smbProperties = null;
if (context != null && context.PreserveSMBAttribute.IsPresent)
{
PreserveSMBAttributes = context is null ? false : context.PreserveSMBAttribute.IsPresent
},
this.GetTransferContext(progressRecord, localFile.Length),
this.CmdletCancellationToken),
progressRecord,
this.OutputStream).ConfigureAwait(false);
FileInfo sourceFileInfo = new FileInfo(localFile.FullName);
smbProperties = new FileSmbProperties();
smbProperties.FileCreatedOn = sourceFileInfo.CreationTimeUtc;
smbProperties.FileLastWrittenOn = sourceFileInfo.LastWriteTimeUtc;
smbProperties.FileAttributes = Util.LocalAttributesToAzureFileNtfsAttributes(File.GetAttributes(localFile.FullName));
}

// set file header and attributes to the file
fileClient.SetHttpHeaders(httpHeaders: header, smbProperties: smbProperties);
}

if (this.PassThru)
{
// fetch latest file properties for output
cloudFileToBeUploaded.FetchAttributes();
}
}

if (this.PassThru)
{
Expand All @@ -160,6 +294,17 @@ await DataMovementTransferHelper.DoTransfer(() =>
}
}

private long Finishedbytes = 0;
private async Task UploadFileRangAsync(ShareFileClient file, HttpRange range, Stream content, IProgress<long> progressHandler = null)
{
await file.UploadRangeAsync(
range,
content,
cancellationToken: this.CmdletCancellationToken).ConfigureAwait(false);
Finishedbytes += range.Length is null? 0 : range.Length.Value;
progressHandler.Report(Finishedbytes);
}

private async Task<CloudFile> BuildCloudFileInstanceFromPathAsync(string defaultFileName, string[] path, bool pathIsDirectory)
{
CloudFileDirectory baseDirectory = null;
Expand Down
Loading

0 comments on commit 5740701

Please sign in to comment.