From 43e871f9aec32ad0832729e9fe139620d2d6c0e8 Mon Sep 17 00:00:00 2001 From: Joel Verhagen Date: Thu, 19 Dec 2019 16:40:12 -0800 Subject: [PATCH] Add registration collector logic to handle an entire batch of catalog leaves (#724) Progress on https://github.com/nuget/nugetgallery/issues/7739 --- .../NuGet.Jobs.Catalog2Registration.csproj | 1 + .../RegistrationCollectorLogic.cs | 84 ++++++ ...Get.Jobs.Catalog2Registration.Tests.csproj | 1 + .../RegistrationCollectorLogicFacts.cs | 241 ++++++++++++++++++ 4 files changed, 327 insertions(+) create mode 100644 src/NuGet.Jobs.Catalog2Registration/RegistrationCollectorLogic.cs create mode 100644 tests/NuGet.Jobs.Catalog2Registration.Tests/RegistrationCollectorLogicFacts.cs diff --git a/src/NuGet.Jobs.Catalog2Registration/NuGet.Jobs.Catalog2Registration.csproj b/src/NuGet.Jobs.Catalog2Registration/NuGet.Jobs.Catalog2Registration.csproj index 7a67a47ab..96de53942 100644 --- a/src/NuGet.Jobs.Catalog2Registration/NuGet.Jobs.Catalog2Registration.csproj +++ b/src/NuGet.Jobs.Catalog2Registration/NuGet.Jobs.Catalog2Registration.csproj @@ -64,6 +64,7 @@ + diff --git a/src/NuGet.Jobs.Catalog2Registration/RegistrationCollectorLogic.cs b/src/NuGet.Jobs.Catalog2Registration/RegistrationCollectorLogic.cs new file mode 100644 index 000000000..48b7c5b57 --- /dev/null +++ b/src/NuGet.Jobs.Catalog2Registration/RegistrationCollectorLogic.cs @@ -0,0 +1,84 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using NuGet.Services; +using NuGet.Services.Metadata.Catalog; +using NuGet.Services.Metadata.Catalog.Helpers; +using NuGet.Services.V3; + +namespace NuGet.Jobs.Catalog2Registration +{ + public class RegistrationCollectorLogic : ICommitCollectorLogic + { + private readonly CommitCollectorUtility _utility; + private readonly IRegistrationUpdater _updater; + private readonly IOptionsSnapshot _options; + private readonly ILogger _logger; + + public RegistrationCollectorLogic( + CommitCollectorUtility utility, + IRegistrationUpdater updater, + IOptionsSnapshot options, + ILogger logger) + { + _utility = utility ?? throw new ArgumentNullException(nameof(utility)); + _updater = updater ?? throw new ArgumentNullException(nameof(updater)); + _options = options ?? throw new ArgumentNullException(nameof(options)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + + if (_options.Value.MaxConcurrentIds <= 0) + { + throw new ArgumentOutOfRangeException( + nameof(options), + $"The {nameof(Catalog2RegistrationConfiguration.MaxConcurrentIds)} must be greater than zero."); + } + } + + public Task> CreateBatchesAsync(IEnumerable catalogItems) + { + // Create a single batch of all unprocessed catalog items so that we can have complete control of the + // parallelism in this class. + return Task.FromResult(_utility.CreateSingleBatch(catalogItems)); + } + + public async Task OnProcessBatchAsync(IEnumerable items) + { + var itemList = items.ToList(); + _logger.LogInformation("Got {Count} catalog commit items to process.", itemList.Count); + + var latestItems = _utility.GetLatestPerIdentity(itemList); + _logger.LogInformation("Got {Count} unique package identities.", latestItems.Count); + + var allWork = _utility.GroupById(latestItems); + _logger.LogInformation("Got {Count} unique IDs.", allWork.Count); + + var allEntryToLeaf = await _utility.GetEntryToDetailsLeafAsync(latestItems); + _logger.LogInformation("Fetched {Count} package details leaves.", allEntryToLeaf.Count); + + _logger.LogInformation("Starting {Count} workers processing each package ID batch.", _options.Value.MaxConcurrentIds); + await ParallelAsync.Repeat( + async () => + { + await Task.Yield(); + while (allWork.TryTake(out var work)) + { + var entryToLeaf = work + .Value + .Where(CommitCollectorUtility.IsOnlyPackageDetails) + .ToDictionary(e => e, e => allEntryToLeaf[e], ReferenceEqualityComparer.Default); + + await _updater.UpdateAsync(work.Id, work.Value, entryToLeaf); + } + }, + _options.Value.MaxConcurrentIds); + + _logger.LogInformation("All workers have completed successfully."); + } + } +} diff --git a/tests/NuGet.Jobs.Catalog2Registration.Tests/NuGet.Jobs.Catalog2Registration.Tests.csproj b/tests/NuGet.Jobs.Catalog2Registration.Tests/NuGet.Jobs.Catalog2Registration.Tests.csproj index 4ab77f9d2..8ed1ad766 100644 --- a/tests/NuGet.Jobs.Catalog2Registration.Tests/NuGet.Jobs.Catalog2Registration.Tests.csproj +++ b/tests/NuGet.Jobs.Catalog2Registration.Tests/NuGet.Jobs.Catalog2Registration.Tests.csproj @@ -64,6 +64,7 @@ + diff --git a/tests/NuGet.Jobs.Catalog2Registration.Tests/RegistrationCollectorLogicFacts.cs b/tests/NuGet.Jobs.Catalog2Registration.Tests/RegistrationCollectorLogicFacts.cs new file mode 100644 index 000000000..db79e44dc --- /dev/null +++ b/tests/NuGet.Jobs.Catalog2Registration.Tests/RegistrationCollectorLogicFacts.cs @@ -0,0 +1,241 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Microsoft.Extensions.Options; +using Moq; +using NuGet.Packaging.Core; +using NuGet.Protocol.Catalog; +using NuGet.Services.Metadata.Catalog; +using NuGet.Services.V3; +using NuGet.Versioning; +using Xunit; +using Xunit.Abstractions; + +namespace NuGet.Jobs.Catalog2Registration +{ + public class RegistrationCollectorLogicFacts + { + public class CreateBatchesAsync : Facts + { + public CreateBatchesAsync(ITestOutputHelper output) : base(output) + { + } + + [Fact] + public async Task SingleBatchWithAllItems() + { + var items = new[] + { + new CatalogCommitItem( + uri: null, + commitId: null, + commitTimeStamp: new DateTime(2018, 1, 1), + types: null, + typeUris: new List(), + packageIdentity: new PackageIdentity("NuGet.Versioning", NuGetVersion.Parse("1.0.0"))), + new CatalogCommitItem( + uri: null, + commitId: null, + commitTimeStamp: new DateTime(2018, 1, 2), + types: null, + typeUris: new List(), + packageIdentity: new PackageIdentity("NuGet.Frameworks", NuGetVersion.Parse("2.0.0"))), + }; + + var batches = await Target.CreateBatchesAsync(items); + + var batch = Assert.Single(batches); + Assert.Equal(2, batch.Items.Count); + Assert.Equal(new DateTime(2018, 1, 2), batch.CommitTimeStamp); + Assert.Equal(items[0], batch.Items[0]); + Assert.Equal(items[1], batch.Items[1]); + } + } + + public class OnProcessBatchAsync : Facts + { + public OnProcessBatchAsync(ITestOutputHelper output) : base(output) + { + } + + [Fact] + public async Task DoesNotFetchLeavesForDeleteEntries() + { + var items = new[] + { + new CatalogCommitItem( + uri: new Uri("https://example/0"), + commitId: null, + commitTimeStamp: new DateTime(2018, 1, 1), + types: null, + typeUris: new List { Schema.DataTypes.PackageDetails }, + packageIdentity: new PackageIdentity("NuGet.Versioning", NuGetVersion.Parse("1.0.0"))), + new CatalogCommitItem( + uri: new Uri("https://example/1"), + commitId: null, + commitTimeStamp: new DateTime(2018, 1, 2), + types: null, + typeUris: new List { Schema.DataTypes.PackageDelete }, + packageIdentity: new PackageIdentity("NuGet.Frameworks", NuGetVersion.Parse("2.0.0"))), + }; + + await Target.OnProcessBatchAsync(items); + + CatalogClient.Verify(x => x.GetPackageDetailsLeafAsync("https://example/0"), Times.Once); + CatalogClient.Verify(x => x.GetPackageDetailsLeafAsync(It.IsAny()), Times.Exactly(1)); + CatalogClient.Verify(x => x.GetPackageDeleteLeafAsync(It.IsAny()), Times.Never); + + RegistrationUpdater.Verify( + x => x.UpdateAsync( + "NuGet.Versioning", + It.Is>( + y => y.Count == 1), + It.Is>( + y => y.Count == 1)), + Times.Once); + RegistrationUpdater.Verify( + x => x.UpdateAsync( + "NuGet.Frameworks", + It.Is>( + y => y.Count == 1), + It.Is>( + y => y.Count == 0)), + Times.Once); + } + + [Fact] + public async Task OperatesOnLatestPerPackageIdentityAndGroupsById() + { + var items = new[] + { + new CatalogCommitItem( + uri: new Uri("https://example/0"), + commitId: null, + commitTimeStamp: new DateTime(2018, 1, 1), + types: null, + typeUris: new List { Schema.DataTypes.PackageDetails }, + packageIdentity: new PackageIdentity("NuGet.Versioning", NuGetVersion.Parse("1.0.0"))), + new CatalogCommitItem( + uri: new Uri("https://example/1"), + commitId: null, + commitTimeStamp: new DateTime(2018, 1, 2), + types: null, + typeUris: new List { Schema.DataTypes.PackageDetails }, + packageIdentity: new PackageIdentity("NuGet.Versioning", NuGetVersion.Parse("1.0.0"))), + new CatalogCommitItem( + uri: new Uri("https://example/2"), + commitId: null, + commitTimeStamp: new DateTime(2018, 1, 2), + types: null, + typeUris: new List { Schema.DataTypes.PackageDetails }, + packageIdentity: new PackageIdentity("NuGet.Versioning", NuGetVersion.Parse("2.0.0"))), + new CatalogCommitItem( + uri: new Uri("https://example/3"), + commitId: null, + commitTimeStamp: new DateTime(2018, 1, 2), + types: null, + typeUris: new List { Schema.DataTypes.PackageDetails }, + packageIdentity: new PackageIdentity("NuGet.Frameworks", NuGetVersion.Parse("1.0.0"))), + }; + + await Target.OnProcessBatchAsync(items); + + CatalogClient.Verify(x => x.GetPackageDetailsLeafAsync("https://example/1"), Times.Once); + CatalogClient.Verify(x => x.GetPackageDetailsLeafAsync("https://example/2"), Times.Once); + CatalogClient.Verify(x => x.GetPackageDetailsLeafAsync("https://example/3"), Times.Once); + CatalogClient.Verify(x => x.GetPackageDetailsLeafAsync(It.IsAny()), Times.Exactly(3)); + + RegistrationUpdater.Verify( + x => x.UpdateAsync( + "NuGet.Versioning", + It.Is>( + y => y.Count == 2), + It.Is>( + y => y.Count == 2)), + Times.Once); + RegistrationUpdater.Verify( + x => x.UpdateAsync( + "NuGet.Frameworks", + It.Is>( + y => y.Count == 1), + It.Is>( + y => y.Count == 1)), + Times.Once); + } + + [Fact] + public async Task RejectsMultipleLeavesForTheSamePackageAtTheSameTime() + { + var items = new[] + { + new CatalogCommitItem( + uri: new Uri("https://example/0"), + commitId: null, + commitTimeStamp: new DateTime(2018, 1, 1), + types: null, + typeUris: new List { Schema.DataTypes.PackageDetails }, + packageIdentity: new PackageIdentity("NuGet.Versioning", NuGetVersion.Parse("1.0.0"))), + new CatalogCommitItem( + uri: new Uri("https://example/1"), + commitId: null, + commitTimeStamp: new DateTime(2018, 1, 1), + types: null, + typeUris: new List { Schema.DataTypes.PackageDetails }, + packageIdentity: new PackageIdentity("NuGet.Versioning", NuGetVersion.Parse("1.0.0"))), + }; + + var ex = await Assert.ThrowsAsync( + () => Target.OnProcessBatchAsync(items)); + + Assert.Equal( + "There are multiple catalog leaves for a single package at one time.", + ex.Message); + RegistrationUpdater.Verify( + x => x.UpdateAsync( + It.IsAny(), + It.IsAny>(), + It.IsAny>()), + Times.Never); + } + } + + public abstract class Facts + { + public Facts(ITestOutputHelper output) + { + CatalogClient = new Mock(); + V3TelemetryService = new Mock(); + CommitCollectorOptions = new Mock>(); + RegistrationUpdater = new Mock(); + Options = new Mock>(); + + CommitCollectorConfiguration = new CommitCollectorConfiguration { MaxConcurrentCatalogLeafDownloads = 1 }; + CommitCollectorOptions.Setup(x => x.Value).Returns(() => CommitCollectorConfiguration); + Configuration = new Catalog2RegistrationConfiguration { MaxConcurrentIds = 1 }; + Options.Setup(x => x.Value).Returns(() => Configuration); + + Target = new RegistrationCollectorLogic( + new CommitCollectorUtility( + CatalogClient.Object, + V3TelemetryService.Object, + CommitCollectorOptions.Object, + output.GetLogger()), + RegistrationUpdater.Object, + Options.Object, + output.GetLogger()); + } + + public Mock CatalogClient { get; } + public Mock V3TelemetryService { get; } + public Mock> CommitCollectorOptions { get; } + public Mock RegistrationUpdater { get; } + public Mock> Options { get; } + public CommitCollectorConfiguration CommitCollectorConfiguration { get; } + public Catalog2RegistrationConfiguration Configuration { get; } + public RegistrationCollectorLogic Target { get; } + } + } +}