Skip to content
This repository has been archived by the owner on Mar 16, 2021. It is now read-only.

Commit

Permalink
Catalog2Dnx: parallelize commit processing (#412)
Browse files Browse the repository at this point in the history
  • Loading branch information
dtivel authored Nov 20, 2018
1 parent 18d1d32 commit ae0d22f
Show file tree
Hide file tree
Showing 15 changed files with 486 additions and 349 deletions.
92 changes: 84 additions & 8 deletions src/Catalog/Dnx/DnxCatalogCollector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -24,7 +25,8 @@ public class DnxCatalogCollector : CommitCollector
private readonly IAzureStorage _sourceStorage;
private readonly DnxMaker _dnxMaker;
private readonly ILogger _logger;
private readonly int _maxDegreeOfParallelism;
private readonly int _maxConcurrentBatches;
private readonly int _maxConcurrentCommitItemsWithinBatch;
private readonly Uri _contentBaseAddress;

public DnxCatalogCollector(
Expand Down Expand Up @@ -52,7 +54,43 @@ public DnxCatalogCollector(
string.Format(Strings.ArgumentOutOfRange, 1, int.MaxValue));
}

_maxDegreeOfParallelism = maxDegreeOfParallelism;
// Find two factors which are close or equal to each other.
var squareRoot = Math.Sqrt(maxDegreeOfParallelism);

// If the max degree of parallelism is a perfect square, great.
// Otherwise, prefer a greater degree of parallelism in batches than commit items within a batch.
_maxConcurrentBatches = Convert.ToInt32(Math.Ceiling(squareRoot));
_maxConcurrentCommitItemsWithinBatch = Convert.ToInt32(maxDegreeOfParallelism / _maxConcurrentBatches);

ServicePointManager.DefaultConnectionLimit = _maxConcurrentBatches * _maxConcurrentCommitItemsWithinBatch;
}

protected override Task<IEnumerable<CatalogCommitItemBatch>> CreateBatchesAsync(
IEnumerable<CatalogCommitItem> catalogItems)
{
var batches = CatalogCommitUtilities.CreateCommitItemBatches(
catalogItems,
CatalogCommitUtilities.GetPackageIdKey);

return Task.FromResult(batches);
}

protected override Task<bool> FetchAsync(
CollectorHttpClient client,
ReadWriteCursor front,
ReadCursor back,
CancellationToken cancellationToken)
{
return CatalogCommitUtilities.ProcessCatalogCommitsAsync(
client,
front,
back,
FetchCatalogCommitsAsync,
CreateBatchesAsync,
ProcessBatchAsync,
_maxConcurrentBatches,
_logger,
cancellationToken);
}

protected override async Task<bool> OnProcessBatchAsync(
Expand All @@ -66,7 +104,7 @@ protected override async Task<bool> OnProcessBatchAsync(
var catalogEntries = items.Select(item => CatalogEntry.Create(item))
.ToList();

// Sanity check: a single catalog batch should not contain multiple entries for the same package ID and version.
// Sanity check: a single catalog batch should not contain multiple entries for the same package identity.
AssertNoMultipleEntriesForSamePackageIdentity(commitTimeStamp, catalogEntries);

// Process .nupkg/.nuspec adds and deletes.
Expand All @@ -85,16 +123,16 @@ private async Task<IEnumerable<CatalogEntry>> ProcessCatalogEntriesAsync(
{
var processedCatalogEntries = new ConcurrentBag<CatalogEntry>();

await catalogEntries.ForEachAsync(_maxDegreeOfParallelism, async catalogEntry =>
await catalogEntries.ForEachAsync(_maxConcurrentCommitItemsWithinBatch, async catalogEntry =>
{
var packageId = catalogEntry.PackageId;
var normalizedPackageVersion = catalogEntry.NormalizedPackageVersion;

if (catalogEntry.Type.AbsoluteUri == Schema.DataTypes.PackageDetails.AbsoluteUri)
{
var properties = GetTelemetryProperties(catalogEntry);
var telemetryProperties = GetTelemetryProperties(catalogEntry);

using (_telemetryService.TrackDuration(TelemetryConstants.ProcessPackageDetailsSeconds, properties))
using (_telemetryService.TrackDuration(TelemetryConstants.ProcessPackageDetailsSeconds, telemetryProperties))
{
var packageFileName = PackageUtility.GetPackageFileName(
packageId,
Expand Down Expand Up @@ -127,6 +165,7 @@ await catalogEntries.ForEachAsync(_maxDegreeOfParallelism, async catalogEntry =>
packageId,
normalizedPackageVersion,
sourceUri,
telemetryProperties,
cancellationToken))
{
processedCatalogEntries.Add(catalogEntry);
Expand Down Expand Up @@ -170,7 +209,7 @@ private async Task UpdatePackageVersionIndexAsync(
{
var catalogEntryGroups = catalogEntries.GroupBy(catalogEntry => catalogEntry.PackageId);

await catalogEntryGroups.ForEachAsync(_maxDegreeOfParallelism, async catalogEntryGroup =>
await catalogEntryGroups.ForEachAsync(_maxConcurrentCommitItemsWithinBatch, async catalogEntryGroup =>
{
var packageId = catalogEntryGroup.Key;
var properties = new Dictionary<string, string>()
Expand Down Expand Up @@ -209,11 +248,13 @@ private async Task<bool> ProcessPackageDetailsAsync(
string packageId,
string normalizedPackageVersion,
Uri sourceUri,
Dictionary<string, string> telemetryProperties,
CancellationToken cancellationToken)
{
if (await ProcessPackageDetailsViaStorageAsync(
packageId,
normalizedPackageVersion,
telemetryProperties,
cancellationToken))
{
return true;
Expand All @@ -229,12 +270,14 @@ private async Task<bool> ProcessPackageDetailsAsync(
packageId,
normalizedPackageVersion,
sourceUri,
telemetryProperties,
cancellationToken);
}

private async Task<bool> ProcessPackageDetailsViaStorageAsync(
string packageId,
string normalizedPackageVersion,
Dictionary<string, string> telemetryProperties,
CancellationToken cancellationToken)
{
if (_sourceStorage == null)
Expand All @@ -255,6 +298,8 @@ private async Task<bool> ProcessPackageDetailsViaStorageAsync(
// If the ETag's differ, we'll fall back to using a single HTTP GET request.
var token1 = await _sourceStorage.GetOptimisticConcurrencyControlTokenAsync(sourceUri, cancellationToken);

telemetryProperties[TelemetryConstants.SizeInBytes] = sourceBlob.Length.ToString();

var nuspec = await GetNuspecAsync(sourceBlob, packageId, cancellationToken);

if (string.IsNullOrEmpty(nuspec))
Expand Down Expand Up @@ -306,6 +351,7 @@ private async Task<bool> ProcessPackageDetailsViaHttpAsync(
string id,
string version,
Uri sourceUri,
Dictionary<string, string> telemetryProperties,
CancellationToken cancellationToken)
{
var packageDownloader = new PackageDownloader(client, _logger);
Expand All @@ -320,6 +366,8 @@ private async Task<bool> ProcessPackageDetailsViaHttpAsync(
return false;
}

telemetryProperties[TelemetryConstants.SizeInBytes] = stream.Length.ToString();

var nuspec = GetNuspec(stream, id);

if (nuspec == null)
Expand Down Expand Up @@ -380,7 +428,7 @@ private static void AssertNoMultipleEntriesForSamePackageIdentity(
}
}

private static async Task<string> GetNuspecAsync(
private async Task<string> GetNuspecAsync(
ICloudBlockBlob sourceBlob,
string packageId,
CancellationToken cancellationToken)
Expand Down Expand Up @@ -438,6 +486,34 @@ private static Dictionary<string, string> GetTelemetryProperties(string packageI
};
}

private async Task ProcessBatchAsync(
CollectorHttpClient client,
JToken context,
string packageId,
CatalogCommitItemBatch batch,
CatalogCommitItemBatch lastBatch,
CancellationToken cancellationToken)
{
await Task.Yield();

using (_telemetryService.TrackDuration(
TelemetryConstants.ProcessBatchSeconds,
new Dictionary<string, string>()
{
{ TelemetryConstants.Id, packageId },
{ TelemetryConstants.BatchItemCount, batch.Items.Count.ToString() }
}))
{
await OnProcessBatchAsync(
client,
batch.Items,
context,
batch.CommitTimeStamp,
isLastBatch: false,
cancellationToken: cancellationToken);
}
}

private sealed class CatalogEntry
{
internal DateTime CommitTimeStamp { get; }
Expand Down
1 change: 1 addition & 0 deletions src/Catalog/Persistence/AzureCloudBlockBlob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public string ContentMD5
}

public string ETag => _blob.Properties.ETag;
public long Length => _blob.Properties.Length;
public Uri Uri => _blob.Uri;

public AzureCloudBlockBlob(CloudBlockBlob blob)
Expand Down
71 changes: 31 additions & 40 deletions src/Catalog/Persistence/AzureStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ public class AzureStorage : Storage, IAzureStorage
{
private readonly bool _compressContent;
private readonly CloudBlobDirectory _directory;
private readonly BlobRequestOptions _blobRequestOptions;
private readonly bool _useServerSideCopy;

public static readonly TimeSpan DefaultServerTimeout = TimeSpan.FromSeconds(30);
Expand Down Expand Up @@ -72,22 +71,24 @@ private AzureStorage(CloudBlobDirectory directory, Uri baseAddress, TimeSpan max
{
_directory = directory;

// Unless overridden at the level of a single API call, these options will apply to all service calls that
// use BlobRequestOptions.
_directory.ServiceClient.DefaultRequestOptions = new BlobRequestOptions()
{
ServerTimeout = serverTimeout,
MaximumExecutionTime = maxExecutionTime,
RetryPolicy = new ExponentialRetry()
};

if (_directory.Container.CreateIfNotExists())
{
_directory.Container.SetPermissions(new BlobContainerPermissions { PublicAccess = BlobContainerPublicAccessType.Blob });

if (Verbose)
{
Trace.WriteLine(String.Format("Created '{0}' publish container", _directory.Container.Name));
Trace.WriteLine(string.Format("Created '{0}' publish container", _directory.Container.Name));
}
}

_blobRequestOptions = new BlobRequestOptions()
{
ServerTimeout = serverTimeout,
MaximumExecutionTime = maxExecutionTime,
RetryPolicy = new ExponentialRetry()
};
}

public override async Task<OptimisticConcurrencyControlToken> GetOptimisticConcurrencyControlTokenAsync(
Expand Down Expand Up @@ -134,7 +135,7 @@ public override bool Exists(string fileName)
}
if (Verbose)
{
Trace.WriteLine(String.Format("The blob {0} does not exist.", packageRegistrationUri));
Trace.WriteLine(string.Format("The blob {0} does not exist.", packageRegistrationUri));
}
return false;
}
Expand Down Expand Up @@ -230,11 +231,7 @@ protected override async Task OnSaveAsync(Uri resourceUri, StorageContent conten

destinationStream.Seek(0, SeekOrigin.Begin);

await blob.UploadFromStreamAsync(destinationStream,
accessCondition: null,
options: _blobRequestOptions,
operationContext: null,
cancellationToken: cancellationToken);
await blob.UploadFromStreamAsync(destinationStream, cancellationToken);

Trace.WriteLine(string.Format("Saved compressed blob {0} to container {1}", blob.Uri.ToString(), _directory.Container.Name));
}
Expand All @@ -243,11 +240,7 @@ await blob.UploadFromStreamAsync(destinationStream,
{
using (Stream stream = content.GetContentStream())
{
await blob.UploadFromStreamAsync(stream,
accessCondition: null,
options: _blobRequestOptions,
operationContext: null,
cancellationToken: cancellationToken);
await blob.UploadFromStreamAsync(stream, cancellationToken);
}

Trace.WriteLine(string.Format("Saved uncompressed blob {0} to container {1}", blob.Uri.ToString(), _directory.Container.Name));
Expand Down Expand Up @@ -306,32 +299,30 @@ protected override async Task<StorageContent> OnLoadAsync(Uri resourceUri, Cance

if (blob.Exists())
{
MemoryStream originalStream = new MemoryStream();
await blob.DownloadToStreamAsync(originalStream,
accessCondition: null,
options: _blobRequestOptions,
operationContext: null,
cancellationToken: cancellationToken);

originalStream.Seek(0, SeekOrigin.Begin);

string content;

if (blob.Properties.ContentEncoding == "gzip")
using (var originalStream = new MemoryStream())
{
using (var uncompressedStream = new GZipStream(originalStream, CompressionMode.Decompress))
await blob.DownloadToStreamAsync(originalStream, cancellationToken);

originalStream.Seek(0, SeekOrigin.Begin);

if (blob.Properties.ContentEncoding == "gzip")
{
using (var reader = new StreamReader(uncompressedStream))
using (var uncompressedStream = new GZipStream(originalStream, CompressionMode.Decompress))
{
content = await reader.ReadToEndAsync();
using (var reader = new StreamReader(uncompressedStream))
{
content = await reader.ReadToEndAsync();
}
}
}
}
else
{
using (var reader = new StreamReader(originalStream))
else
{
content = await reader.ReadToEndAsync();
using (var reader = new StreamReader(originalStream))
{
content = await reader.ReadToEndAsync();
}
}
}

Expand All @@ -340,7 +331,7 @@ await blob.DownloadToStreamAsync(originalStream,

if (Verbose)
{
Trace.WriteLine(String.Format("Can't load '{0}'. Blob doesn't exist", resourceUri));
Trace.WriteLine(string.Format("Can't load '{0}'. Blob doesn't exist", resourceUri));
}

return null;
Expand All @@ -353,7 +344,7 @@ protected override async Task OnDeleteAsync(Uri resourceUri, CancellationToken c
CloudBlockBlob blob = _directory.GetBlockBlobReference(name);
await blob.DeleteAsync(deleteSnapshotsOption: DeleteSnapshotsOption.IncludeSnapshots,
accessCondition: null,
options: _blobRequestOptions,
options: null,
operationContext: null,
cancellationToken: cancellationToken);
}
Expand Down
1 change: 1 addition & 0 deletions src/Catalog/Persistence/ICloudBlockBlob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public interface ICloudBlockBlob
{
string ContentMD5 { get; set; }
string ETag { get; }
long Length { get; }
Uri Uri { get; }

Task<bool> ExistsAsync(CancellationToken cancellationToken);
Expand Down
1 change: 1 addition & 0 deletions src/Catalog/Telemetry/TelemetryConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public static class TelemetryConstants
public const string ProcessPackageDetailsSeconds = "ProcessPackageDetailsSeconds";
public const string ProcessPackageVersionIndexSeconds = "ProcessPackageVersionIndexSeconds";
public const string RegistrationDeltaCount = "RegistrationDeltaCount";
public const string SizeInBytes = "SizeInBytes";
public const string StatusCode = "StatusCode";
public const string Success = "Success";
public const string Uri = "Uri";
Expand Down
2 changes: 2 additions & 0 deletions src/Ng/Jobs/Catalog2DnxJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ protected override void Init(IDictionary<string, string> arguments, Cancellation
preferredPackageSourceStorageFactory);
Logger.LogInformation("HTTP client timeout: {Timeout}", httpClientTimeout);

MaxDegreeOfParallelism = 256;

_collector = new DnxCatalogCollector(
new Uri(source),
storageFactory,
Expand Down
2 changes: 1 addition & 1 deletion src/Ng/Jobs/NgJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public abstract class NgJob
protected ILoggerFactory LoggerFactory;
protected ILogger Logger;

protected int MaxDegreeOfParallelism { get; }
protected int MaxDegreeOfParallelism { get; set; }

protected NgJob(ITelemetryService telemetryService, ILoggerFactory loggerFactory)
{
Expand Down
Loading

0 comments on commit ae0d22f

Please sign in to comment.