diff --git a/.gitignore b/.gitignore index ab19420..5c3e57e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,12 +1,13 @@ -.vs/* -*/obj/* -*/bin/* -packages -*.user -/coverage-opencover.xml -launchSettings.json -*/logs/* -*.db -/TCC.Tests/'coverage.xml' -/coverage.xml -/published +.vs/* +*/obj/* +*/bin/* +packages +*.user +/coverage-opencover.xml +launchSettings.json +*/logs/* +*.db +/TCC.Tests/'coverage.xml' +/coverage.xml +/published +.idea \ No newline at end of file diff --git a/TCC.Lib/Helpers/StringExtensions.cs b/TCC.Lib/Helpers/StringExtensions.cs index 4f239fa..5bd238b 100644 --- a/TCC.Lib/Helpers/StringExtensions.cs +++ b/TCC.Lib/Helpers/StringExtensions.cs @@ -1,102 +1,120 @@ -using System; -using System.Globalization; -using System.IO; - -namespace TCC.Lib.Helpers -{ - public static class StringExtensions - { - public static string Escape(this string str) - { - return '"' + str.Trim('"') + '"'; - } - - public static string HumanizedBandwidth(this double bandwidth, int decimals = 2) - { - var ordinals = new[] { "", "K", "M", "G", "T", "P", "E" }; - var rate = (decimal)bandwidth; - var ordinal = 0; - while (rate > 1024) - { - rate /= 1024; - ordinal++; - } - return String.Format("{0:n" + decimals + "} {1}b/s", Math.Round(rate, decimals, MidpointRounding.AwayFromZero), ordinals[ordinal]); - } - - public static String HumanizeSize(this long size) - { - string[] suf = { "B", "Ko", "Mo", "Go", "To", "Po", "Eo" }; - if (size == 0) - return "0" + suf[0]; - long bytes = Math.Abs(size); - int place = Convert.ToInt32(Math.Floor(Math.Log(bytes, 1024))); - double num = Math.Round(bytes / Math.Pow(1024, place), 1); - return (Math.Sign(size) * num).ToString(CultureInfo.InvariantCulture) + " " + suf[place]; - } - - public static string HumanizedTimeSpan(this TimeSpan t, int parts = 2) - { - string result = string.Empty; - if (t.TotalDays >= 1 && parts > 0) - { - result += $"{t.Days}d "; - parts--; - } - if (t.TotalHours >= 1 && parts > 0) - { - result += $"{t.Hours}h "; - parts--; - } - if (t.TotalMinutes >= 1 && parts > 0) - { - result += $"{t.Minutes}m "; - parts--; - } - if (t.Seconds >= 1 && parts > 0) - { - result += $"{t.Seconds}s "; - parts--; - } - if (t.Milliseconds >= 1 && parts > 0) - { - result += $"{t.Milliseconds}ms"; - } - return result.TrimEnd(); - } - - public static string Pad(this string source, int length) - { - if (source == null) - { - return new string(' ', length); - } - return source.Length > length ? source.Substring(0, length) : source.PadLeft(length, ' '); - } - - public static (string Name, DateTime? Date) ExtractArchiveNameAndDate(this string filePath) - { - if (filePath == null) - throw new ArgumentNullException(nameof(filePath)); - - string segment = Path.GetFileNameWithoutExtension(filePath); - if (segment.EndsWith(".diff") || segment.EndsWith(".full")) - { - segment = segment.Substring(0, segment.Length - 5); - } - - var lastSegment = segment.LastIndexOf('_'); - if (lastSegment > 0) - { - string name = segment.Substring(0, lastSegment); - string date = segment.Substring(lastSegment + 1); - if (date.TryParseArchiveDateTime(out var dt)) - { - return (name, dt); - } - return (name, null); - } - return (segment, null); - } - } +using System; +using System.Globalization; +using System.IO; +using System.Linq; + +namespace TCC.Lib.Helpers +{ + public static class StringExtensions + { + public static string Escape(this string str) + { + return '"' + str.Trim('"') + '"'; + } + + public static string HumanizedBandwidth(this double bandwidth, int decimals = 2) + { + var ordinals = new[] { "", "K", "M", "G", "T", "P", "E" }; + var rate = (decimal)bandwidth; + var ordinal = 0; + while (rate > 1024) + { + rate /= 1024; + ordinal++; + } + return String.Format("{0:n" + decimals + "} {1}b/s", Math.Round(rate, decimals, MidpointRounding.AwayFromZero), ordinals[ordinal]); + } + + public static String HumanizeSize(this long size) + { + string[] suf = { "B", "Ko", "Mo", "Go", "To", "Po", "Eo" }; + if (size == 0) + return "0" + suf[0]; + long bytes = Math.Abs(size); + int place = Convert.ToInt32(Math.Floor(Math.Log(bytes, 1024))); + double num = Math.Round(bytes / Math.Pow(1024, place), 1); + return (Math.Sign(size) * num).ToString(CultureInfo.InvariantCulture) + " " + suf[place]; + } + + public static long ParseSize(this string humanizedSize) + { + if (string.IsNullOrWhiteSpace(humanizedSize)) + return -1; + string[] suf = { "b", "ko", "mo", "go", "to", "po", "eo" }; + var size = humanizedSize.Trim().ToLower(CultureInfo.InvariantCulture); + var number = string.Join("", size.Where(char.IsDigit)); + var unit = size.Substring(size.Length - 2); + var pow = Array.IndexOf(suf, unit); + + return pow switch + { + -1 => long.Parse(number, CultureInfo.InvariantCulture), + _ => long.Parse(number, CultureInfo.InvariantCulture) * (long)Math.Pow(1024L, pow) + }; + } + + public static string HumanizedTimeSpan(this TimeSpan t, int parts = 2) + { + string result = string.Empty; + if (t.TotalDays >= 1 && parts > 0) + { + result += $"{t.Days}d "; + parts--; + } + if (t.TotalHours >= 1 && parts > 0) + { + result += $"{t.Hours}h "; + parts--; + } + if (t.TotalMinutes >= 1 && parts > 0) + { + result += $"{t.Minutes}m "; + parts--; + } + if (t.Seconds >= 1 && parts > 0) + { + result += $"{t.Seconds}s "; + parts--; + } + if (t.Milliseconds >= 1 && parts > 0) + { + result += $"{t.Milliseconds}ms"; + } + return result.TrimEnd(); + } + + public static string Pad(this string source, int length) + { + if (source == null) + { + return new string(' ', length); + } + return source.Length > length ? source.Substring(0, length) : source.PadLeft(length, ' '); + } + + public static (string Name, DateTime? Date) ExtractArchiveNameAndDate(this string filePath) + { + if (filePath == null) + throw new ArgumentNullException(nameof(filePath)); + + string segment = Path.GetFileNameWithoutExtension(filePath); + if (segment.EndsWith(".diff") || segment.EndsWith(".full")) + { + segment = segment.Substring(0, segment.Length - 5); + } + + var lastSegment = segment.LastIndexOf('_'); + if (lastSegment > 0) + { + string name = segment.Substring(0, lastSegment); + string date = segment.Substring(lastSegment + 1); + if (date.TryParseArchiveDateTime(out var dt)) + { + return (name, dt); + } + return (name, null); + } + return (segment, null); + } + } } \ No newline at end of file diff --git a/TCC.Lib/OperationBlock.cs b/TCC.Lib/OperationBlock.cs index f5e7955..ebbda5e 100644 --- a/TCC.Lib/OperationBlock.cs +++ b/TCC.Lib/OperationBlock.cs @@ -236,6 +236,7 @@ public class StepResult public string Errors { get; set; } public string Warning { get; set; } public string Infos { get; set; } + public UploadMode? UploadMode { get; set; } public bool IsSuccess => !HasError && !HasWarning; public bool HasError => !string.IsNullOrWhiteSpace(Errors); public bool HasWarning => !string.IsNullOrWhiteSpace(Warning); diff --git a/TCC.Lib/Options/CompressOption.cs b/TCC.Lib/Options/CompressOption.cs index f64aef4..c51c58f 100644 --- a/TCC.Lib/Options/CompressOption.cs +++ b/TCC.Lib/Options/CompressOption.cs @@ -1,27 +1,36 @@ -using System.Collections.Generic; -using TCC.Lib.Blocks; -using TCC.Lib.Database; - -namespace TCC.Lib.Options -{ - public class CompressOption : TccOption - { - public BlockMode BlockMode { get; set; } - public CompressionAlgo Algo { get; set; } - public int CompressionRatio { get; set; } - public BackupMode? BackupMode { get; set; } - public int? RetryPeriodInSeconds { get; set; } - public IEnumerable Filter { get; set; } - public IEnumerable Exclude { get; set; } - public bool FolderPerDay { get; set; } - public int? BoostRatio { get; set; } - public int? CleanupTime { get; set; } - public string AzBlobUrl { get; set; } - public string AzBlobContainer { get; set; } - public string AzBlobSaS { get; set; } - public int? AzThread { get; set; } - public string GoogleStorageBucketName { get; set; } - public string GoogleStorageCredential { get; set; } - public UploadMode? UploadMode { get; set; } - } +using System.Collections.Generic; +using System.Linq; +using TCC.Lib.Blocks; +using TCC.Lib.Database; + +namespace TCC.Lib.Options +{ + public class CompressOption : TccOption + { + public BlockMode BlockMode { get; set; } + public CompressionAlgo Algo { get; set; } + public int CompressionRatio { get; set; } + public BackupMode? BackupMode { get; set; } + public int? RetryPeriodInSeconds { get; set; } + public IEnumerable Filter { get; set; } + public IEnumerable Exclude { get; set; } + public bool FolderPerDay { get; set; } + public int? BoostRatio { get; set; } + public int? CleanupTime { get; set; } + public string AzBlobUrl { get; set; } + public string AzBlobContainer { get; set; } + public string AzBlobSaS { get; set; } + public int? AzThread { get; set; } + public string GoogleStorageBucketName { get; set; } + public string GoogleStorageCredential { get; set; } + public string S3AccessKeyId { get; set; } + public string S3SecretAcessKey { get; set; } + public string S3Host { get; set; } + public string S3BucketName { get; set; } + public string S3Region { get; set; } + public string S3MultipartThreshold { get; set; } + public string S3MultipartSize { get; set; } + public IEnumerable UploadModes { get; set; } = Enumerable.Empty(); + public UploadMode? UploadMode { get; set; } + } } \ No newline at end of file diff --git a/TCC.Lib/ReadOnlyChunkedStream.cs b/TCC.Lib/ReadOnlyChunkedStream.cs new file mode 100644 index 0000000..9adfb4c --- /dev/null +++ b/TCC.Lib/ReadOnlyChunkedStream.cs @@ -0,0 +1,48 @@ +using System; +using System.IO; + +namespace TCC.Lib +{ + public class ReadOnlyChunkedStream : Stream + { + private readonly Stream _inputStream; + private readonly int _chunkSize; + + public ReadOnlyChunkedStream(Stream inputStream, int chunkSize) + { + _inputStream = inputStream; + _chunkSize = chunkSize; + } + + public override void Flush() => _inputStream.Flush(); + + public override int Read(byte[] buffer, int offset, int count) + { + var maxRead = (int)Math.Min(count, _chunkSize - Position); + var nbRead = _inputStream.Read(buffer, offset, maxRead); + Position += nbRead; + return nbRead; + } + + public override long Seek(long offset, SeekOrigin origin) + { + return _inputStream.Seek(offset, origin); + } + + public override void SetLength(long value) + { + throw new NotSupportedException(); + } + + public override void Write(byte[] buffer, int offset, int count) + { + throw new NotSupportedException(); + } + + public override bool CanRead => _inputStream.CanRead; + public override bool CanSeek => _inputStream.CanSeek; + public override bool CanWrite => false; + public override long Length => Math.Min(_chunkSize, _inputStream.Length - _inputStream.Position); + public override long Position { get; set; } + } +} \ No newline at end of file diff --git a/TCC.Lib/Storage/AzureRemoteStorage.cs b/TCC.Lib/Storage/AzureRemoteStorage.cs index 43287c7..943cd45 100644 --- a/TCC.Lib/Storage/AzureRemoteStorage.cs +++ b/TCC.Lib/Storage/AzureRemoteStorage.cs @@ -24,6 +24,9 @@ public async Task UploadAsync(string targetPath, Stream data, Ca ErrorMessage = response.ReasonPhrase, RemoteFilePath = targetPath }; - } + } + + public UploadMode Mode => UploadMode.AzureSdk; + } } \ No newline at end of file diff --git a/TCC.Lib/Storage/GoogleRemoteStorage.cs b/TCC.Lib/Storage/GoogleRemoteStorage.cs index 2b85cdc..18045eb 100644 --- a/TCC.Lib/Storage/GoogleRemoteStorage.cs +++ b/TCC.Lib/Storage/GoogleRemoteStorage.cs @@ -38,6 +38,8 @@ public async Task UploadAsync(string targetPath, Stream data, Ca IsSuccess = true, RemoteFilePath = targetPath }; - } + } + + public UploadMode Mode => UploadMode.GoogleCloudStorage; } } \ No newline at end of file diff --git a/TCC.Lib/Storage/IRemoteStorage.cs b/TCC.Lib/Storage/IRemoteStorage.cs index 273348e..db402a2 100644 --- a/TCC.Lib/Storage/IRemoteStorage.cs +++ b/TCC.Lib/Storage/IRemoteStorage.cs @@ -8,12 +8,14 @@ namespace TCC.Lib.Storage public interface IRemoteStorage { Task UploadAsync(string targetPath, Stream data, CancellationToken token); - - public async Task UploadAsync(FileInfo file, DirectoryInfo rootFolder, CancellationToken token) + + async Task UploadAsync(FileInfo file, DirectoryInfo rootFolder, CancellationToken token) { string targetPath = file.GetRelativeTargetPathTo(rootFolder); await using FileStream uploadFileStream = File.OpenRead(file.FullName); return await UploadAsync(targetPath, uploadFileStream, token); - } + } + + UploadMode Mode { get; } } } \ No newline at end of file diff --git a/TCC.Lib/Storage/NoneRemoteStorage.cs b/TCC.Lib/Storage/NoneRemoteStorage.cs index eb3e942..6ee437e 100644 --- a/TCC.Lib/Storage/NoneRemoteStorage.cs +++ b/TCC.Lib/Storage/NoneRemoteStorage.cs @@ -9,6 +9,8 @@ public class NoneRemoteStorage : IRemoteStorage public Task UploadAsync(string targetPath, Stream data, CancellationToken token) { return Task.FromResult(new UploadResponse { IsSuccess = true, RemoteFilePath = targetPath }); - } + } + + public UploadMode Mode => UploadMode.None; } } diff --git a/TCC.Lib/Storage/RemoteStorageFactory.cs b/TCC.Lib/Storage/RemoteStorageFactory.cs index 8d97434..5162e1d 100644 --- a/TCC.Lib/Storage/RemoteStorageFactory.cs +++ b/TCC.Lib/Storage/RemoteStorageFactory.cs @@ -1,53 +1,116 @@ -using Azure.Storage.Blobs; -using Google.Apis.Auth.OAuth2; -using Google.Cloud.Storage.V1; -using Microsoft.Extensions.Logging; -using System; -using System.IO; -using System.Text; -using System.Threading; -using System.Threading.Tasks; -using TCC.Lib.Helpers; -using TCC.Lib.Options; - -namespace TCC.Lib.Storage -{ - public static class RemoteStorageFactory - { - public static async Task GetRemoteStorageAsync(this CompressOption option, ILogger logger, CancellationToken token) - { - switch (option.UploadMode) - { - case UploadMode.AzureSdk: - { - if (string.IsNullOrEmpty(option.AzBlobUrl) - || string.IsNullOrEmpty(option.AzBlobContainer) - || string.IsNullOrEmpty(option.AzBlobSaS)) - { - logger.LogCritical("Configuration error for azure blob upload"); - return new NoneRemoteStorage(); - } - var client = new BlobServiceClient(new Uri(option.AzBlobUrl + "/" + option.AzBlobContainer + "?" + option.AzBlobSaS)); - BlobContainerClient container = client.GetBlobContainerClient(option.AzBlobContainer); - return new AzureRemoteStorage(container); - } - case UploadMode.GoogleCloudStorage: - { - if (string.IsNullOrEmpty(option.GoogleStorageCredential) - || string.IsNullOrEmpty(option.GoogleStorageBucketName)) - { - logger.LogCritical("Configuration error for google storage upload"); - return new NoneRemoteStorage(); - } - StorageClient storage = await GoogleAuthHelper.GetGoogleStorageClientAsync(option.GoogleStorageCredential, token); - return new GoogleRemoteStorage(storage, option.GoogleStorageBucketName); - } - case UploadMode.None: - case null: - return new NoneRemoteStorage(); - default: - throw new ArgumentOutOfRangeException(); - } - } - } +using Amazon.Runtime; +using Amazon.S3; +using Azure.Storage.Blobs; +using Google.Cloud.Storage.V1; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.CompilerServices; +using System.Threading; +using TCC.Lib.Helpers; +using TCC.Lib.Options; + +namespace TCC.Lib.Storage +{ + public static class RemoteStorageFactory + { + public static async IAsyncEnumerable GetRemoteStoragesAsync(this CompressOption option, ILogger logger, [EnumeratorCancellation] CancellationToken token) + { + if (option.UploadMode.HasValue) + { + option.UploadModes = option.UploadModes.Append(option.UploadMode.Value); + } + option.UploadModes = option.UploadModes.Distinct(); + + foreach(var mode in option.UploadModes) + { + switch (mode) + { + case UploadMode.AzureSdk: + { + if (string.IsNullOrEmpty(option.AzBlobUrl)) + { + logger.LogCritical("[AzureBlobStorage] Configuration error: missing/invalid --AzBlobUrl"); + continue; + } + if (string.IsNullOrEmpty(option.AzBlobContainer)) + { + logger.LogCritical("[AzureBlobStorage] Configuration error: missing/invalid --AzBlobContainer"); + continue; + } + if (string.IsNullOrEmpty(option.AzBlobSaS)) + { + logger.LogCritical("[AzureBlobStorage] Configuration error: missing/invalid --AzBlobSaS"); + continue; + } + var client = new BlobServiceClient(new Uri(option.AzBlobUrl + "/" + option.AzBlobContainer + "?" + option.AzBlobSaS)); + BlobContainerClient container = client.GetBlobContainerClient(option.AzBlobContainer); + yield return new AzureRemoteStorage(container); + break; + } + case UploadMode.GoogleCloudStorage: + { + if (string.IsNullOrEmpty(option.GoogleStorageCredential)) + { + logger.LogCritical("[GoogleStorage] Configuration error: missing/invalid --GoogleStorageCredential"); + continue; + } + if (string.IsNullOrEmpty(option.GoogleStorageBucketName)) + { + logger.LogCritical("[GoogleStorage] Configuration error: missing/invalid --GoogleStorageBucketName"); + continue; + } + StorageClient storage = await GoogleAuthHelper.GetGoogleStorageClientAsync(option.GoogleStorageCredential, token); + yield return new GoogleRemoteStorage(storage, option.GoogleStorageBucketName); + break; + } + case UploadMode.S3: + if (string.IsNullOrEmpty(option.S3AccessKeyId)) + { + logger.LogCritical("[S3Storage] Configuration error: missing/invalid --S3AccessKeyId"); + continue; + } + if (string.IsNullOrEmpty(option.S3Host)) + { + logger.LogCritical("[S3Storage] Configuration error: missing/invalid --S3Host"); + continue; + } + if (string.IsNullOrEmpty(option.S3Region)) + { + logger.LogCritical("[S3Storage] Configuration error: missing/invalid --S3Region"); + continue; + } + if (string.IsNullOrEmpty(option.S3BucketName)) + { + logger.LogCritical("[S3Storage] Configuration error: missing/invalid --S3BucketName"); + continue; + } + if (string.IsNullOrEmpty(option.S3SecretAcessKey)) + { + logger.LogCritical("[S3Storage] Configuration error: missing/invalid --S3SecretAcessKey"); + continue; + } + + var credentials = new BasicAWSCredentials(option.S3AccessKeyId, option.S3SecretAcessKey); + var s3Config = new AmazonS3Config + { + AuthenticationRegion = option.S3Region, + ServiceURL = option.S3Host, + }; + + yield return new S3RemoteStorage( + new AmazonS3Client(credentials, s3Config), + option.S3BucketName, + option.S3MultipartThreshold.ParseSize(), + (int)option.S3MultipartSize.ParseSize()); + break; + case UploadMode.None: + break; + default: + throw new ArgumentOutOfRangeException(); + } + } + } + } } \ No newline at end of file diff --git a/TCC.Lib/Storage/S3RemoteStorage.cs b/TCC.Lib/Storage/S3RemoteStorage.cs new file mode 100644 index 0000000..89b2396 --- /dev/null +++ b/TCC.Lib/Storage/S3RemoteStorage.cs @@ -0,0 +1,101 @@ +using Amazon.S3; +using Amazon.S3.Model; +using System; +using System.Collections.Generic; +using System.IO; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; + +namespace TCC.Lib.Storage +{ + public class S3RemoteStorage : IRemoteStorage + { + internal string BucketName { get; } + private readonly AmazonS3Client _s3Client; + private readonly long _multipartTreshold; + private readonly int _partSize; + + public S3RemoteStorage(AmazonS3Client s3Client, string bucketName, long multipartThreshold = 0, int partSize = 0) + { + BucketName = bucketName; + _s3Client = s3Client; + _multipartTreshold = multipartThreshold; + _partSize = partSize; + } + + public async Task UploadAsync(string targetPath, Stream data, CancellationToken token) + { + try + { + if (_multipartTreshold > 0 && data.Length > _multipartTreshold) + { + await UploadStreamToMultipartsAsync(targetPath, data, token); + } else + { + await _s3Client.PutObjectAsync(new () + { + BucketName = BucketName, + Key = targetPath, + InputStream = data, + }, token); + } + } + catch (Exception e) + { + return new UploadResponse + { + IsSuccess = false, + RemoteFilePath = targetPath, + ErrorMessage = e.Message + }; + } + return new UploadResponse + { + IsSuccess = true, + RemoteFilePath = targetPath + }; + } + + private async Task UploadStreamToMultipartsAsync(string targetPath, Stream data, CancellationToken token) + { + var multipartUpload = await _s3Client.InitiateMultipartUploadAsync(new () + { + BucketName = BucketName, + Key = targetPath, + }, token); + var partsETags = new List(); + var partNumber = 1; + + + while (true) + { + await using var chunk = new ReadOnlyChunkedStream(data, _partSize); + + if (!chunk.CanRead || chunk.Length == 0) + { + break; + } + + var partUpload = await _s3Client.UploadPartAsync(new() + { + BucketName = BucketName, + PartNumber = partNumber++, + Key = targetPath, + UploadId = multipartUpload.UploadId, + InputStream = chunk, + }, token); + + partsETags.Add(new() {ETag = partUpload.ETag, PartNumber = partUpload.PartNumber,}); + } + await _s3Client.CompleteMultipartUploadAsync(new() + { + BucketName = BucketName, + Key = targetPath, + UploadId = multipartUpload.UploadId, + PartETags = partsETags + }, token); + } + public UploadMode Mode => UploadMode.S3; + } +} \ No newline at end of file diff --git a/TCC.Lib/TCC.Lib.csproj b/TCC.Lib/TCC.Lib.csproj index 8534aae..4befbab 100644 --- a/TCC.Lib/TCC.Lib.csproj +++ b/TCC.Lib/TCC.Lib.csproj @@ -6,6 +6,7 @@ + @@ -17,6 +18,7 @@ + diff --git a/TCC.Lib/TarCompressCrypt.cs b/TCC.Lib/TarCompressCrypt.cs index 91f9bd0..9218a22 100644 --- a/TCC.Lib/TarCompressCrypt.cs +++ b/TCC.Lib/TarCompressCrypt.cs @@ -1,448 +1,455 @@ -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Logging; -using System; -using System.Collections.Generic; -using System.Diagnostics; -using System.IO; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using TCC.Lib.AsyncStreams; -using TCC.Lib.Blocks; -using TCC.Lib.Command; -using TCC.Lib.Database; -using TCC.Lib.Dependencies; -using TCC.Lib.Helpers; -using TCC.Lib.Options; -using TCC.Lib.PrepareBlocks; -using TCC.Lib.Storage; - -namespace TCC.Lib -{ - public class TarCompressCrypt - { - private readonly CancellationTokenSource _cancellationTokenSource; - private readonly ILogger _logger; - private readonly EncryptionCommands _encryptionCommands; - private readonly CompressionCommands _compressionCommands; - private readonly IServiceProvider _serviceProvider; - private readonly DatabaseHelper _databaseHelper; - private int _compressCounter; - private int _uploadCounter; - private int _totalCounter; - - public TarCompressCrypt(CancellationTokenSource cancellationTokenSource, ILogger logger, EncryptionCommands encryptionCommands, CompressionCommands compressionCommands, IServiceProvider serviceProvider, DatabaseHelper databaseHelper) - { - _cancellationTokenSource = cancellationTokenSource; - _logger = logger; - _encryptionCommands = encryptionCommands; - _compressionCommands = compressionCommands; - _serviceProvider = serviceProvider; - _databaseHelper = databaseHelper; - } - - public async Task CompressAsync(CompressOption option) - { - var sw = Stopwatch.StartNew(); - - var compFolder = new CompressionFolderProvider(new DirectoryInfo(option.DestinationDir), option.FolderPerDay); - - IEnumerable blocks = option.GenerateCompressBlocks(compFolder); - IPrepareCompressBlocks prepare = new FileSystemPrepareCompressBlocks(compFolder, option.BackupMode, option.CleanupTime); - var buffer = prepare.PrepareCompressionBlocksAsync(blocks); - _totalCounter = buffer.Count; - PrepareBoostRatio(option, buffer); - _logger.LogInformation("job prepared in " + sw.Elapsed.HumanizedTimeSpan()); - _logger.LogInformation("Starting compression job"); - var po = ParallelizeOption(option); - - IRemoteStorage uploader = await option.GetRemoteStorageAsync(_logger, _cancellationTokenSource.Token); - - var operationBlocks = await buffer - .AsAsyncStream(_cancellationTokenSource.Token) - .CountAsync(out var counter) - // Prepare encryption - .ParallelizeStreamAsync(async (b, token) => - { - b.StartTime = DateTime.UtcNow; - await _encryptionCommands.PrepareEncryptionKey(b, option, token); - return b; - }, po) - // Core loop - .ParallelizeStreamAsync((block, token) => - { - return CompressionBlockInternal(option, block, token); - }, po) - // Cleanup loop - .ParallelizeStreamAsync(async (opb, token) => - { - await CleanupOldFiles(opb); - await _encryptionCommands.CleanupKey(opb.BlockResult.Block, option, Mode.Compress); - return opb; - }, po) - // Upload loop - .ParallelizeStreamAsync((block, token) => UploadBlockInternal(uploader, option, block, token), new ParallelizeOption { FailMode = Fail.Smart, MaxDegreeOfParallelism = option.AzThread ?? 1 }) - .AsReadOnlyCollectionAsync(); - - sw.Stop(); - return new OperationSummary(operationBlocks, option.Threads, sw, option.SourceDirOrFile); - } - - private async Task UploadBlockInternal(IRemoteStorage uploader, CompressOption option, OperationCompressionBlock block, CancellationToken token) - { - if (uploader is NoneRemoteStorage) - { - return block; - } - - int count = Interlocked.Increment(ref _uploadCounter); - string progress = $"{count}/{_totalCounter}"; - - var file = block.CompressionBlock.DestinationArchiveFileInfo; - var name = file.Name; - RetryContext ctx = null; - while (true) - { - bool hasError; - try - { - var sw = Stopwatch.StartNew(); - - var result = await uploader.UploadAsync(file, block.CompressionBlock.FolderProvider.RootFolder, token); - hasError = !result.IsSuccess; - - sw.Stop(); - double speed = file.Length / sw.Elapsed.TotalSeconds; - - block.BlockResult.StepResults.Add(new StepResult - { - Type = StepType.Upload, - Errors = result.IsSuccess ? null : result.ErrorMessage, - Infos = result.IsSuccess ? result.ErrorMessage : null, - Duration = sw.Elapsed, - ArchiveFileSize = file.Length, - }); - - - if (!hasError) - { - _logger.LogInformation($"{progress} Uploaded \"{file.Name}\" in {sw.Elapsed.HumanizedTimeSpan()} at {speed.HumanizedBandwidth()} "); - } - else - { - if (ctx == null && option.RetryPeriodInSeconds.HasValue) - { - ctx = new RetryContext(option.RetryPeriodInSeconds.Value); - } - _logger.LogError($"{progress} Uploaded {file.Name} with errors. {result.ErrorMessage}"); - } - } - catch (Exception e) - { - hasError = true; - if (ctx == null && option.RetryPeriodInSeconds.HasValue) - { - ctx = new RetryContext(option.RetryPeriodInSeconds.Value); - } - _logger.LogCritical(e, $"{progress} Error uploading {name}"); - } - - if (hasError) - { - if (ctx != null && await ctx.WaitForNextRetry()) - { - _logger.LogWarning($"{progress} Retrying uploading {name}, attempt #{ctx.Retries}"); - } - else - { - break; - } - } - else - { - break; - } - } - - return block; - } - - private async Task CleanupOldFiles(OperationCompressionBlock opb) - { - if (opb.CompressionBlock.FullsToDelete != null) - { - foreach (var full in opb.CompressionBlock.FullsToDelete) - { - try - { - await full.TryDeleteFileWithRetryAsync(); - } - catch (Exception e) - { - _logger.LogError(e, "Error while deleting FULL file "); - } - } - } - if (opb.CompressionBlock.DiffsToDelete != null) - { - foreach (var diff in opb.CompressionBlock.DiffsToDelete) - { - try - { - await diff.TryDeleteFileWithRetryAsync(); - } - catch (Exception e) - { - _logger.LogError(e, "Error while deleting DIFF file "); - } - } - } - } - - private void PrepareBoostRatio(CompressOption option, IEnumerable buffer) - { - _logger.LogInformation("Requested order : "); - int countFull = 0; - int countDiff = 0; - foreach (CompressionBlock block in buffer) - { - if (block.BackupMode == BackupMode.Full) - { - countFull++; - } - else - { - countDiff++; - } - } - if (option.Threads > 1 && option.BoostRatio.HasValue) - { - if (countFull == 0 && countDiff > 0) - { - _logger.LogInformation($"100% diff ({countDiff}), running with X{option.BoostRatio.Value} more threads"); - // boost mode when 100% of diff, we want to saturate iops : X mode - option.Threads = Math.Min(option.Threads * option.BoostRatio.Value, countDiff); - if (option.AzThread.HasValue) - { - option.AzThread = Math.Min(option.AzThread.Value * option.BoostRatio.Value, countDiff); - } - } - else if (countFull != 0 && (countDiff / (double)(countFull + countDiff) >= 0.9)) - { - _logger.LogInformation($"{countFull} full, {countDiff} diffs, running with X{option.BoostRatio.Value} more threads"); - // boost mode when 95% of diff, we want to saturate iops - option.Threads = Math.Min(option.Threads * option.BoostRatio.Value, countDiff); - if (option.AzThread.HasValue) - { - option.AzThread = Math.Min(option.AzThread.Value * option.BoostRatio.Value, countDiff); - } - } - else - { - _logger.LogInformation($"No boost mode : {countFull} full, {countDiff} diffs"); - } - } - } - - private async Task CompressionBlockInternal(CompressOption option, CompressionBlock block, CancellationToken token) - { - - int count = Interlocked.Increment(ref _compressCounter); - string progress = $"{count}/{_totalCounter}"; - RetryContext ctx = null; - CommandResult result = null; - while (true) - { - bool hasError = false; - try - { - string cmd = _compressionCommands.CompressCommand(block, option); - result = await cmd.Run(block.OperationFolder, token); - result.ArchiveFileSize = block.DestinationArchiveFileInfo.Length; - LogCompressionReport(block, result); - - if (result.HasError) - { - hasError = true; - if (ctx == null && option.RetryPeriodInSeconds.HasValue) - { - ctx = new RetryContext(option.RetryPeriodInSeconds.Value); - } - } - var report = $"{progress} [{block.BackupMode}] : {block.BlockName}"; - if (block.BackupMode == BackupMode.Diff) - { - report += $" (from {block.DiffDate})"; - } - _logger.LogInformation(report); - - } - catch (Exception e) - { - hasError = true; - if (ctx == null && option.RetryPeriodInSeconds.HasValue) - { - ctx = new RetryContext(option.RetryPeriodInSeconds.Value); - } - _logger.LogCritical(e, $"{progress} Error compressing {block.Source}"); - } - - if (hasError) - { - if (ctx != null && await ctx.WaitForNextRetry()) - { - _logger.LogWarning($"{progress} Retrying compressing {block.Source}, attempt #{ctx.Retries}"); - await block.DestinationArchiveFileInfo.TryDeleteFileWithRetryAsync(); - } - else - { - await block.DestinationArchiveFileInfo.TryDeleteFileWithRetryAsync(); - break; - } - } - else - { - break; - } - } - return new OperationCompressionBlock(block, result); - } - - public async Task DecompressAsync(DecompressOption option) - { - var sw = Stopwatch.StartNew(); - var po = ParallelizeOption(option); - var job = await _databaseHelper.InitializeRestoreJobAsync(); - - IEnumerable blocks = option.GenerateDecompressBlocks().ToList(); - var prepare = new DatabasePreparedDecompressionBlocks(_serviceProvider.GetRequiredService(), _logger); - IAsyncEnumerable ordered = prepare.PrepareDecompressionBlocksAsync(blocks); - - IReadOnlyCollection operationBlocks = - await ordered - .AsAsyncStream(_cancellationTokenSource.Token) - .CountAsync(out var counter) - // Prepare decryption - .ParallelizeStreamAsync(async (b, token) => - { - b.StartTime = DateTime.UtcNow; - await _encryptionCommands.PrepareDecryptionKey(b.BackupFull ?? b.BackupsDiff.FirstOrDefault(), option, token); - return b; - }, po) - // Core loop - .ParallelizeStreamAsync(async (batch, token) => - { - int count = Interlocked.Increment(ref _compressCounter); - string progress = $"{count}/{_totalCounter}"; - - var blockResults = new List(); - - if (batch.BackupFull != null) - { - batch.BackupFullCommandResult = await DecompressBlock(option, batch.BackupFull, token); - blockResults.Add(new BlockResult(batch.BackupFull, batch.BackupFullCommandResult, StepType.Decompression)); - } - - if (batch.BackupsDiff != null) - { - batch.BackupDiffCommandResult = new CommandResult[batch.BackupsDiff.Length]; - for (int i = 0; i < batch.BackupsDiff.Length; i++) - { - batch.BackupDiffCommandResult[i] = await DecompressBlock(option, batch.BackupsDiff[i], token); - blockResults.Add(new BlockResult(batch.BackupsDiff[i], batch.BackupDiffCommandResult[i], StepType.Decompression)); - } - } - - if (batch.BackupFull != null) - { - var report = $"{progress} [{BackupMode.Full}] : {batch.BackupFull.BlockName} (from {batch.BackupFull.BlockDate})"; - _logger.LogInformation(report); - } - if (batch.BackupsDiff != null) - { - foreach (var dec in batch.BackupsDiff) - { - var report = $"{progress} [{BackupMode.Diff}] : {dec.BlockName} (from {dec.BlockDate})"; - _logger.LogInformation(report); - } - } - - return new OperationDecompressionsBlock(blockResults, batch); - }, po) - // Cleanup loop - .ParallelizeStreamAsync(async (odb, token) => - { - foreach (var b in odb.BlockResults) - { - await _encryptionCommands.CleanupKey(b.Block, option, Mode.Compress); - } - return odb; - }, po) - .AsReadOnlyCollectionAsync(); - - sw.Stop(); - await _databaseHelper.AddRestoreBlockJobAsync(job, operationBlocks); - await _databaseHelper.UpdateRestoreJobStatsAsync(sw, job); - return new OperationSummary(operationBlocks, option.Threads, sw, option.SourceDirOrFile); - } - - private async Task DecompressBlock(DecompressOption option, DecompressionBlock block, CancellationToken token) - { - CommandResult result = null; - try - { - string cmd = _compressionCommands.DecompressCommand(block, option); - result = await cmd.Run(block.OperationFolder, token); - result.ArchiveFileSize = block.SourceArchiveFileInfo.Length; - - LogReport(block, result); - } - catch (Exception e) - { - _logger.LogError(e, $"Error decompressing {block.Source}"); - } - return result; - } - - private static ParallelizeOption ParallelizeOption(TccOption option) - { - var po = new ParallelizeOption - { - FailMode = option.FailFast ? Fail.Fast : Fail.Smart, - MaxDegreeOfParallelism = option.Threads - }; - return po; - } - - private void LogCompressionReport(CompressionBlock block, CommandResult result) - { - _logger.LogInformation($"Compressed {block.Source} in {result?.Elapsed.HumanizedTimeSpan()}, {block.DestinationArchiveFileInfo.Length.HumanizeSize()}, {block.DestinationArchiveFileInfo.FullName}"); - - if (result?.Infos.Any() ?? false) - { - _logger.LogWarning($"Compressed {block.Source} with warning : {string.Join(Environment.NewLine, result.Infos)}"); - } - - if (result?.HasError ?? false) - { - _logger.LogError($"Compressed {block.Source} with errors : {result.Errors}"); - } - } - - - private void LogReport(DecompressionBlock block, CommandResult result) - { - _logger.LogInformation($"Decompressed {block.Source} in {result?.Elapsed.HumanizedTimeSpan()}, {block.SourceArchiveFileInfo.Length.HumanizeSize()}, {block.SourceArchiveFileInfo.FullName}"); - - if (result?.Infos.Any() ?? false) - { - _logger.LogWarning($"Decompressed {block.Source} with warning : {string.Join(Environment.NewLine, result.Infos)}"); - } - - if (result?.HasError ?? false) - { - _logger.LogError($"Decompressed {block.Source} with errors : {result.Errors}"); - } - } - } +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using TCC.Lib.AsyncStreams; +using TCC.Lib.Blocks; +using TCC.Lib.Command; +using TCC.Lib.Database; +using TCC.Lib.Dependencies; +using TCC.Lib.Helpers; +using TCC.Lib.Options; +using TCC.Lib.PrepareBlocks; +using TCC.Lib.Storage; + +namespace TCC.Lib +{ + public class TarCompressCrypt + { + private readonly CancellationTokenSource _cancellationTokenSource; + private readonly ILogger _logger; + private readonly EncryptionCommands _encryptionCommands; + private readonly CompressionCommands _compressionCommands; + private readonly IServiceProvider _serviceProvider; + private readonly DatabaseHelper _databaseHelper; + private int _compressCounter; + private int _uploadCounter; + private int _totalCounter; + + public TarCompressCrypt(CancellationTokenSource cancellationTokenSource, ILogger logger, EncryptionCommands encryptionCommands, CompressionCommands compressionCommands, IServiceProvider serviceProvider, DatabaseHelper databaseHelper) + { + _cancellationTokenSource = cancellationTokenSource; + _logger = logger; + _encryptionCommands = encryptionCommands; + _compressionCommands = compressionCommands; + _serviceProvider = serviceProvider; + _databaseHelper = databaseHelper; + } + + public async Task CompressAsync(CompressOption option) + { + var sw = Stopwatch.StartNew(); + + var compFolder = new CompressionFolderProvider(new DirectoryInfo(option.DestinationDir), option.FolderPerDay); + + IEnumerable blocks = option.GenerateCompressBlocks(compFolder); + IPrepareCompressBlocks prepare = new FileSystemPrepareCompressBlocks(compFolder, option.BackupMode, option.CleanupTime); + var buffer = prepare.PrepareCompressionBlocksAsync(blocks); + _totalCounter = buffer.Count; + PrepareBoostRatio(option, buffer); + _logger.LogInformation("job prepared in " + sw.Elapsed.HumanizedTimeSpan()); + _logger.LogInformation("Starting compression job"); + var po = ParallelizeOption(option); + + List uploaders = await option.GetRemoteStoragesAsync(_logger, _cancellationTokenSource.Token).ToListAsync(); + + var operationBlocks = await buffer + .AsAsyncStream(_cancellationTokenSource.Token) + .CountAsync(out var counter) + // Prepare encryption + .ParallelizeStreamAsync(async (b, token) => + { + b.StartTime = DateTime.UtcNow; + await _encryptionCommands.PrepareEncryptionKey(b, option, token); + return b; + }, po) + // Core loop + .ParallelizeStreamAsync((block, token) => + { + return CompressionBlockInternal(option, block, token); + }, po) + // Cleanup loop + .ParallelizeStreamAsync(async (opb, token) => + { + await CleanupOldFiles(opb); + await _encryptionCommands.CleanupKey(opb.BlockResult.Block, option, Mode.Compress); + return opb; + }, po) + // Upload loop + .ParallelizeStreamAsync((block, token) => UploadBlockInternal(uploaders, option, block, token), new ParallelizeOption { FailMode = Fail.Smart, MaxDegreeOfParallelism = option.AzThread ?? 1 }) + .AsReadOnlyCollectionAsync(); + + sw.Stop(); + return new OperationSummary(operationBlocks, option.Threads, sw, option.SourceDirOrFile); + } + + private async Task UploadBlockInternal(IEnumerable uploaders, CompressOption option, OperationCompressionBlock block, CancellationToken token) + { + int count = Interlocked.Increment(ref _uploadCounter); + string progress = $"{count}/{_totalCounter}"; + + foreach (var uploader in uploaders) + { + if (uploader is NoneRemoteStorage) + { + continue; + } + await UploadBlockToSingleRemoteStorageAsync(option, block, progress, uploader, token); + } + return block; + } + + private async Task UploadBlockToSingleRemoteStorageAsync(CompressOption option, OperationCompressionBlock block, string progress, IRemoteStorage uploader, CancellationToken token) + { + var file = block.CompressionBlock.DestinationArchiveFileInfo; + var name = file.Name; + RetryContext ctx = null; + while (true) + { + bool hasError; + try + { + var sw = Stopwatch.StartNew(); + + var result = await uploader.UploadAsync(file, block.CompressionBlock.FolderProvider.RootFolder, token); + hasError = !result.IsSuccess; + + sw.Stop(); + double speed = file.Length / sw.Elapsed.TotalSeconds; + + block.BlockResult.StepResults.Add(new StepResult + { + Type = StepType.Upload, + UploadMode = uploader.Mode, + Errors = result.IsSuccess ? null : result.ErrorMessage, + Infos = result.IsSuccess ? result.ErrorMessage : null, + Duration = sw.Elapsed, + ArchiveFileSize = file.Length, + }); + + + if (!hasError) + { + _logger.LogInformation("[{mode}] {progress} Uploaded \"{filename}\" in {duration} at {speed} ", uploader.Mode, progress, file.Name, sw.Elapsed.HumanizedTimeSpan(), speed.HumanizedBandwidth()); + } + else + { + if (ctx == null && option.RetryPeriodInSeconds.HasValue) + { + ctx = new RetryContext(option.RetryPeriodInSeconds.Value); + } + _logger.LogError("[{mode}] {progress} Uploaded {filename} with errors. {errorMessage}", uploader.Mode, progress, file.Name, result.ErrorMessage); + } + } + catch (Exception e) + { + hasError = true; + if (ctx == null && option.RetryPeriodInSeconds.HasValue) + { + ctx = new RetryContext(option.RetryPeriodInSeconds.Value); + } + _logger.LogCritical(e, "[{mode}] {progress} Error uploading {name}", uploader.Mode, progress, name); + } + + if (hasError) + { + if (ctx != null && await ctx.WaitForNextRetry()) + { + _logger.LogWarning("[{mode}] {progress} Retrying uploading {name}, attempt #{attempt}", uploader.Mode, progress, name, ctx.Retries); + } + else + { + break; + } + } + else + { + break; + } + } + } + + private async Task CleanupOldFiles(OperationCompressionBlock opb) + { + if (opb.CompressionBlock.FullsToDelete != null) + { + foreach (var full in opb.CompressionBlock.FullsToDelete) + { + try + { + await full.TryDeleteFileWithRetryAsync(); + } + catch (Exception e) + { + _logger.LogError(e, "Error while deleting FULL file "); + } + } + } + if (opb.CompressionBlock.DiffsToDelete != null) + { + foreach (var diff in opb.CompressionBlock.DiffsToDelete) + { + try + { + await diff.TryDeleteFileWithRetryAsync(); + } + catch (Exception e) + { + _logger.LogError(e, "Error while deleting DIFF file "); + } + } + } + } + + private void PrepareBoostRatio(CompressOption option, IEnumerable buffer) + { + _logger.LogInformation("Requested order : "); + int countFull = 0; + int countDiff = 0; + foreach (CompressionBlock block in buffer) + { + if (block.BackupMode == BackupMode.Full) + { + countFull++; + } + else + { + countDiff++; + } + } + if (option.Threads > 1 && option.BoostRatio.HasValue) + { + if (countFull == 0 && countDiff > 0) + { + _logger.LogInformation($"100% diff ({countDiff}), running with X{option.BoostRatio.Value} more threads"); + // boost mode when 100% of diff, we want to saturate iops : X mode + option.Threads = Math.Min(option.Threads * option.BoostRatio.Value, countDiff); + if (option.AzThread.HasValue) + { + option.AzThread = Math.Min(option.AzThread.Value * option.BoostRatio.Value, countDiff); + } + } + else if (countFull != 0 && (countDiff / (double)(countFull + countDiff) >= 0.9)) + { + _logger.LogInformation($"{countFull} full, {countDiff} diffs, running with X{option.BoostRatio.Value} more threads"); + // boost mode when 95% of diff, we want to saturate iops + option.Threads = Math.Min(option.Threads * option.BoostRatio.Value, countDiff); + if (option.AzThread.HasValue) + { + option.AzThread = Math.Min(option.AzThread.Value * option.BoostRatio.Value, countDiff); + } + } + else + { + _logger.LogInformation($"No boost mode : {countFull} full, {countDiff} diffs"); + } + } + } + + private async Task CompressionBlockInternal(CompressOption option, CompressionBlock block, CancellationToken token) + { + + int count = Interlocked.Increment(ref _compressCounter); + string progress = $"{count}/{_totalCounter}"; + RetryContext ctx = null; + CommandResult result = null; + while (true) + { + bool hasError = false; + try + { + string cmd = _compressionCommands.CompressCommand(block, option); + result = await cmd.Run(block.OperationFolder, token); + result.ArchiveFileSize = block.DestinationArchiveFileInfo.Length; + LogCompressionReport(block, result); + + if (result.HasError) + { + hasError = true; + if (ctx == null && option.RetryPeriodInSeconds.HasValue) + { + ctx = new RetryContext(option.RetryPeriodInSeconds.Value); + } + } + var report = $"{progress} [{block.BackupMode}] : {block.BlockName}"; + if (block.BackupMode == BackupMode.Diff) + { + report += $" (from {block.DiffDate})"; + } + _logger.LogInformation(report); + + } + catch (Exception e) + { + hasError = true; + if (ctx == null && option.RetryPeriodInSeconds.HasValue) + { + ctx = new RetryContext(option.RetryPeriodInSeconds.Value); + } + _logger.LogCritical(e, $"{progress} Error compressing {block.Source}"); + } + + if (hasError) + { + if (ctx != null && await ctx.WaitForNextRetry()) + { + _logger.LogWarning($"{progress} Retrying compressing {block.Source}, attempt #{ctx.Retries}"); + await block.DestinationArchiveFileInfo.TryDeleteFileWithRetryAsync(); + } + else + { + await block.DestinationArchiveFileInfo.TryDeleteFileWithRetryAsync(); + break; + } + } + else + { + break; + } + } + return new OperationCompressionBlock(block, result); + } + + public async Task DecompressAsync(DecompressOption option) + { + var sw = Stopwatch.StartNew(); + var po = ParallelizeOption(option); + var job = await _databaseHelper.InitializeRestoreJobAsync(); + + IEnumerable blocks = option.GenerateDecompressBlocks().ToList(); + var prepare = new DatabasePreparedDecompressionBlocks(_serviceProvider.GetRequiredService(), _logger); + IAsyncEnumerable ordered = prepare.PrepareDecompressionBlocksAsync(blocks); + + IReadOnlyCollection operationBlocks = + await ordered + .AsAsyncStream(_cancellationTokenSource.Token) + .CountAsync(out var counter) + // Prepare decryption + .ParallelizeStreamAsync(async (b, token) => + { + b.StartTime = DateTime.UtcNow; + await _encryptionCommands.PrepareDecryptionKey(b.BackupFull ?? b.BackupsDiff.FirstOrDefault(), option, token); + return b; + }, po) + // Core loop + .ParallelizeStreamAsync(async (batch, token) => + { + int count = Interlocked.Increment(ref _compressCounter); + string progress = $"{count}/{_totalCounter}"; + + var blockResults = new List(); + + if (batch.BackupFull != null) + { + batch.BackupFullCommandResult = await DecompressBlock(option, batch.BackupFull, token); + blockResults.Add(new BlockResult(batch.BackupFull, batch.BackupFullCommandResult, StepType.Decompression)); + } + + if (batch.BackupsDiff != null) + { + batch.BackupDiffCommandResult = new CommandResult[batch.BackupsDiff.Length]; + for (int i = 0; i < batch.BackupsDiff.Length; i++) + { + batch.BackupDiffCommandResult[i] = await DecompressBlock(option, batch.BackupsDiff[i], token); + blockResults.Add(new BlockResult(batch.BackupsDiff[i], batch.BackupDiffCommandResult[i], StepType.Decompression)); + } + } + + if (batch.BackupFull != null) + { + var report = $"{progress} [{BackupMode.Full}] : {batch.BackupFull.BlockName} (from {batch.BackupFull.BlockDate})"; + _logger.LogInformation(report); + } + if (batch.BackupsDiff != null) + { + foreach (var dec in batch.BackupsDiff) + { + var report = $"{progress} [{BackupMode.Diff}] : {dec.BlockName} (from {dec.BlockDate})"; + _logger.LogInformation(report); + } + } + + return new OperationDecompressionsBlock(blockResults, batch); + }, po) + // Cleanup loop + .ParallelizeStreamAsync(async (odb, token) => + { + foreach (var b in odb.BlockResults) + { + await _encryptionCommands.CleanupKey(b.Block, option, Mode.Compress); + } + return odb; + }, po) + .AsReadOnlyCollectionAsync(); + + sw.Stop(); + await _databaseHelper.AddRestoreBlockJobAsync(job, operationBlocks); + await _databaseHelper.UpdateRestoreJobStatsAsync(sw, job); + return new OperationSummary(operationBlocks, option.Threads, sw, option.SourceDirOrFile); + } + + private async Task DecompressBlock(DecompressOption option, DecompressionBlock block, CancellationToken token) + { + CommandResult result = null; + try + { + string cmd = _compressionCommands.DecompressCommand(block, option); + result = await cmd.Run(block.OperationFolder, token); + result.ArchiveFileSize = block.SourceArchiveFileInfo.Length; + + LogReport(block, result); + } + catch (Exception e) + { + _logger.LogError(e, $"Error decompressing {block.Source}"); + } + return result; + } + + private static ParallelizeOption ParallelizeOption(TccOption option) + { + var po = new ParallelizeOption + { + FailMode = option.FailFast ? Fail.Fast : Fail.Smart, + MaxDegreeOfParallelism = option.Threads + }; + return po; + } + + private void LogCompressionReport(CompressionBlock block, CommandResult result) + { + _logger.LogInformation($"Compressed {block.Source} in {result?.Elapsed.HumanizedTimeSpan()}, {block.DestinationArchiveFileInfo.Length.HumanizeSize()}, {block.DestinationArchiveFileInfo.FullName}"); + + if (result?.Infos.Any() ?? false) + { + _logger.LogWarning($"Compressed {block.Source} with warning : {string.Join(Environment.NewLine, result.Infos)}"); + } + + if (result?.HasError ?? false) + { + _logger.LogError($"Compressed {block.Source} with errors : {result.Errors}"); + } + } + + + private void LogReport(DecompressionBlock block, CommandResult result) + { + _logger.LogInformation($"Decompressed {block.Source} in {result?.Elapsed.HumanizedTimeSpan()}, {block.SourceArchiveFileInfo.Length.HumanizeSize()}, {block.SourceArchiveFileInfo.FullName}"); + + if (result?.Infos.Any() ?? false) + { + _logger.LogWarning($"Decompressed {block.Source} with warning : {string.Join(Environment.NewLine, result.Infos)}"); + } + + if (result?.HasError ?? false) + { + _logger.LogError($"Decompressed {block.Source} with errors : {result.Errors}"); + } + } + } } \ No newline at end of file diff --git a/TCC.Lib/UploadMode.cs b/TCC.Lib/UploadMode.cs index 6ff5896..ebfae59 100644 --- a/TCC.Lib/UploadMode.cs +++ b/TCC.Lib/UploadMode.cs @@ -1,9 +1,10 @@ -namespace TCC.Lib +namespace TCC.Lib { public enum UploadMode { None, AzureSdk, - GoogleCloudStorage + GoogleCloudStorage, + S3 } } \ No newline at end of file diff --git a/TCC.Tests/Upload/BlobStorageTests.cs b/TCC.Tests/Upload/BlobStorageTests.cs index 7a71523..830689e 100644 --- a/TCC.Tests/Upload/BlobStorageTests.cs +++ b/TCC.Tests/Upload/BlobStorageTests.cs @@ -1,111 +1,112 @@ -using Microsoft.Extensions.Logging.Abstractions; -using Newtonsoft.Json; -using Newtonsoft.Json.Linq; -using System; -using System.IO; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using TCC.Lib; -using TCC.Lib.Benchmark; -using TCC.Lib.Options; -using TCC.Lib.Storage; -using Xunit; - -namespace TCC.Tests.Upload -{ - public class UploadToStorageTests : IClassFixture - { - private EnvVarFixture _envVarFixture; - - public UploadToStorageTests(EnvVarFixture envVarFixture) - { - _envVarFixture = envVarFixture; - } - - [Fact(Skip = "desactivated accound")] - public async Task AzureUploadTest() - { - string toCompressFolder = TestFileHelper.NewFolder(); - var data = await TestData.CreateFiles(1, 1024, toCompressFolder); - - var opt = new CompressOption() - { - AzBlobUrl = GetEnvVar("AZ_URL"), - AzBlobContainer = GetEnvVar("AZ_CONTAINER"), - AzBlobSaS = GetEnvVar("AZ_SAS_TOKEN") - }; - opt.UploadMode = UploadMode.AzureSdk; - var uploader = await opt.GetRemoteStorageAsync(NullLogger.Instance, CancellationToken.None); - - var ok = await uploader.UploadAsync(data.Files.First(), new DirectoryInfo(toCompressFolder), CancellationToken.None); - - Assert.True(ok.IsSuccess); - } - - [Fact] - public async Task GoogleUploadTest() - { - string toCompressFolder = TestFileHelper.NewFolder(); - var data = await TestData.CreateFiles(1, 1024, toCompressFolder); - - var opt = new CompressOption() - { - GoogleStorageBucketName = GetEnvVar("GoogleBucket"), - GoogleStorageCredential = GetEnvVar("GoogleCredential") - }; - - opt.UploadMode = UploadMode.GoogleCloudStorage; - var uploader = await opt.GetRemoteStorageAsync(NullLogger.Instance, CancellationToken.None); - - var ok = await uploader.UploadAsync(data.Files.First(), new DirectoryInfo(toCompressFolder), CancellationToken.None); - - Assert.True(ok.IsSuccess); - - var gs = uploader as GoogleRemoteStorage; - - await gs.Storage.DeleteObjectAsync(gs.BucketName, ok.RemoteFilePath); - } - - string GetEnvVar(string key) - { - var s = Environment.GetEnvironmentVariable(key); - Assert.True(s != null, key); - Assert.True(!string.IsNullOrWhiteSpace(s), key); - return s; - } - - } - - - public class EnvVarFixture : IDisposable - { - public EnvVarFixture() - { - if (File.Exists("Properties\\launchSettings.json")) - { - using var file = File.OpenText("Properties\\launchSettings.json"); - var reader = new JsonTextReader(file); - var jObject = JObject.Load(reader); - - var variables = jObject - .GetValue("profiles") - //select a proper profile here - .SelectMany(profiles => profiles.Children()) - .SelectMany(profile => profile.Children()) - .Where(prop => prop.Name == "environmentVariables") - .SelectMany(prop => prop.Value.Children()) - .ToList(); - - foreach (var variable in variables) - { - Environment.SetEnvironmentVariable(variable.Name, variable.Value.ToString()); - } - } - } - - public void Dispose() - { - } - } -} +using Microsoft.Extensions.Logging.Abstractions; +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using TCC.Lib; +using TCC.Lib.Benchmark; +using TCC.Lib.Options; +using TCC.Lib.Storage; +using Xunit; + +namespace TCC.Tests.Upload +{ + public class UploadToStorageTests : IClassFixture + { + private EnvVarFixture _envVarFixture; + + public UploadToStorageTests(EnvVarFixture envVarFixture) + { + _envVarFixture = envVarFixture; + } + + [Fact(Skip = "desactivated accound")] + public async Task AzureUploadTest() + { + string toCompressFolder = TestFileHelper.NewFolder(); + var data = await TestData.CreateFiles(1, 1024, toCompressFolder); + + var opt = new CompressOption() + { + AzBlobUrl = GetEnvVar("AZ_URL"), + AzBlobContainer = GetEnvVar("AZ_CONTAINER"), + AzBlobSaS = GetEnvVar("AZ_SAS_TOKEN") + }; + opt.UploadModes = new List() { UploadMode.AzureSdk }; + var uploader = await opt.GetRemoteStoragesAsync(NullLogger.Instance, CancellationToken.None).ToListAsync(); + + var ok = await uploader.First().UploadAsync(data.Files.First(), new DirectoryInfo(toCompressFolder), CancellationToken.None); + + Assert.True(ok.IsSuccess); + } + + [Fact] + public async Task GoogleUploadTest() + { + string toCompressFolder = TestFileHelper.NewFolder(); + var data = await TestData.CreateFiles(1, 1024, toCompressFolder); + + var opt = new CompressOption() + { + GoogleStorageBucketName = GetEnvVar("GoogleBucket"), + GoogleStorageCredential = GetEnvVar("GoogleCredential") + }; + + opt.UploadModes = new List() { UploadMode.GoogleCloudStorage }; + var uploader = await opt.GetRemoteStoragesAsync(NullLogger.Instance, CancellationToken.None).ToListAsync(); + + var ok = await uploader.First().UploadAsync(data.Files.First(), new DirectoryInfo(toCompressFolder), CancellationToken.None); + + Assert.True(ok.IsSuccess); + + var gs = uploader.First() as GoogleRemoteStorage; + + await gs.Storage.DeleteObjectAsync(gs.BucketName, ok.RemoteFilePath); + } + + string GetEnvVar(string key) + { + var s = Environment.GetEnvironmentVariable(key); + Assert.True(s != null, key); + Assert.True(!string.IsNullOrWhiteSpace(s), key); + return s; + } + + } + + + public class EnvVarFixture : IDisposable + { + public EnvVarFixture() + { + if (File.Exists("Properties\\launchSettings.json")) + { + using var file = File.OpenText("Properties\\launchSettings.json"); + var reader = new JsonTextReader(file); + var jObject = JObject.Load(reader); + + var variables = jObject + .GetValue("profiles") + //select a proper profile here + .SelectMany(profiles => profiles.Children()) + .SelectMany(profile => profile.Children()) + .Where(prop => prop.Name == "environmentVariables") + .SelectMany(prop => prop.Value.Children()) + .ToList(); + + foreach (var variable in variables) + { + Environment.SetEnvironmentVariable(variable.Name, variable.Value.ToString()); + } + } + } + + public void Dispose() + { + } + } +} diff --git a/TCC/Parser/CompressCommand.cs b/TCC/Parser/CompressCommand.cs index 8814060..159add9 100644 --- a/TCC/Parser/CompressCommand.cs +++ b/TCC/Parser/CompressCommand.cs @@ -58,13 +58,21 @@ protected override IEnumerable