This repository has been archived by the owner on Jul 30, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 21
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add registration collector logic to handle an entire batch of catalog…
… leaves (#724) Progress on NuGet/NuGetGallery#7739
- Loading branch information
1 parent
359bf1e
commit 43e871f
Showing
4 changed files
with
327 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
84 changes: 84 additions & 0 deletions
84
src/NuGet.Jobs.Catalog2Registration/RegistrationCollectorLogic.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
// Copyright (c) .NET Foundation. All rights reserved. | ||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. | ||
|
||
using System; | ||
using System.Collections.Generic; | ||
using System.Linq; | ||
using System.Threading.Tasks; | ||
using Microsoft.Extensions.Logging; | ||
using Microsoft.Extensions.Options; | ||
using NuGet.Services; | ||
using NuGet.Services.Metadata.Catalog; | ||
using NuGet.Services.Metadata.Catalog.Helpers; | ||
using NuGet.Services.V3; | ||
|
||
namespace NuGet.Jobs.Catalog2Registration | ||
{ | ||
public class RegistrationCollectorLogic : ICommitCollectorLogic | ||
{ | ||
private readonly CommitCollectorUtility _utility; | ||
private readonly IRegistrationUpdater _updater; | ||
private readonly IOptionsSnapshot<Catalog2RegistrationConfiguration> _options; | ||
private readonly ILogger<RegistrationCollectorLogic> _logger; | ||
|
||
public RegistrationCollectorLogic( | ||
CommitCollectorUtility utility, | ||
IRegistrationUpdater updater, | ||
IOptionsSnapshot<Catalog2RegistrationConfiguration> options, | ||
ILogger<RegistrationCollectorLogic> logger) | ||
{ | ||
_utility = utility ?? throw new ArgumentNullException(nameof(utility)); | ||
_updater = updater ?? throw new ArgumentNullException(nameof(updater)); | ||
_options = options ?? throw new ArgumentNullException(nameof(options)); | ||
_logger = logger ?? throw new ArgumentNullException(nameof(logger)); | ||
|
||
if (_options.Value.MaxConcurrentIds <= 0) | ||
{ | ||
throw new ArgumentOutOfRangeException( | ||
nameof(options), | ||
$"The {nameof(Catalog2RegistrationConfiguration.MaxConcurrentIds)} must be greater than zero."); | ||
} | ||
} | ||
|
||
public Task<IEnumerable<CatalogCommitItemBatch>> CreateBatchesAsync(IEnumerable<CatalogCommitItem> catalogItems) | ||
{ | ||
// Create a single batch of all unprocessed catalog items so that we can have complete control of the | ||
// parallelism in this class. | ||
return Task.FromResult(_utility.CreateSingleBatch(catalogItems)); | ||
} | ||
|
||
public async Task OnProcessBatchAsync(IEnumerable<CatalogCommitItem> items) | ||
{ | ||
var itemList = items.ToList(); | ||
_logger.LogInformation("Got {Count} catalog commit items to process.", itemList.Count); | ||
|
||
var latestItems = _utility.GetLatestPerIdentity(itemList); | ||
_logger.LogInformation("Got {Count} unique package identities.", latestItems.Count); | ||
|
||
var allWork = _utility.GroupById(latestItems); | ||
_logger.LogInformation("Got {Count} unique IDs.", allWork.Count); | ||
|
||
var allEntryToLeaf = await _utility.GetEntryToDetailsLeafAsync(latestItems); | ||
_logger.LogInformation("Fetched {Count} package details leaves.", allEntryToLeaf.Count); | ||
|
||
_logger.LogInformation("Starting {Count} workers processing each package ID batch.", _options.Value.MaxConcurrentIds); | ||
await ParallelAsync.Repeat( | ||
async () => | ||
{ | ||
await Task.Yield(); | ||
while (allWork.TryTake(out var work)) | ||
{ | ||
var entryToLeaf = work | ||
.Value | ||
.Where(CommitCollectorUtility.IsOnlyPackageDetails) | ||
.ToDictionary(e => e, e => allEntryToLeaf[e], ReferenceEqualityComparer<CatalogCommitItem>.Default); | ||
|
||
await _updater.UpdateAsync(work.Id, work.Value, entryToLeaf); | ||
} | ||
}, | ||
_options.Value.MaxConcurrentIds); | ||
|
||
_logger.LogInformation("All workers have completed successfully."); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
241 changes: 241 additions & 0 deletions
241
tests/NuGet.Jobs.Catalog2Registration.Tests/RegistrationCollectorLogicFacts.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,241 @@ | ||
// Copyright (c) .NET Foundation. All rights reserved. | ||
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. | ||
|
||
using System; | ||
using System.Collections.Generic; | ||
using System.Threading.Tasks; | ||
using Microsoft.Extensions.Options; | ||
using Moq; | ||
using NuGet.Packaging.Core; | ||
using NuGet.Protocol.Catalog; | ||
using NuGet.Services.Metadata.Catalog; | ||
using NuGet.Services.V3; | ||
using NuGet.Versioning; | ||
using Xunit; | ||
using Xunit.Abstractions; | ||
|
||
namespace NuGet.Jobs.Catalog2Registration | ||
{ | ||
public class RegistrationCollectorLogicFacts | ||
{ | ||
public class CreateBatchesAsync : Facts | ||
{ | ||
public CreateBatchesAsync(ITestOutputHelper output) : base(output) | ||
{ | ||
} | ||
|
||
[Fact] | ||
public async Task SingleBatchWithAllItems() | ||
{ | ||
var items = new[] | ||
{ | ||
new CatalogCommitItem( | ||
uri: null, | ||
commitId: null, | ||
commitTimeStamp: new DateTime(2018, 1, 1), | ||
types: null, | ||
typeUris: new List<Uri>(), | ||
packageIdentity: new PackageIdentity("NuGet.Versioning", NuGetVersion.Parse("1.0.0"))), | ||
new CatalogCommitItem( | ||
uri: null, | ||
commitId: null, | ||
commitTimeStamp: new DateTime(2018, 1, 2), | ||
types: null, | ||
typeUris: new List<Uri>(), | ||
packageIdentity: new PackageIdentity("NuGet.Frameworks", NuGetVersion.Parse("2.0.0"))), | ||
}; | ||
|
||
var batches = await Target.CreateBatchesAsync(items); | ||
|
||
var batch = Assert.Single(batches); | ||
Assert.Equal(2, batch.Items.Count); | ||
Assert.Equal(new DateTime(2018, 1, 2), batch.CommitTimeStamp); | ||
Assert.Equal(items[0], batch.Items[0]); | ||
Assert.Equal(items[1], batch.Items[1]); | ||
} | ||
} | ||
|
||
public class OnProcessBatchAsync : Facts | ||
{ | ||
public OnProcessBatchAsync(ITestOutputHelper output) : base(output) | ||
{ | ||
} | ||
|
||
[Fact] | ||
public async Task DoesNotFetchLeavesForDeleteEntries() | ||
{ | ||
var items = new[] | ||
{ | ||
new CatalogCommitItem( | ||
uri: new Uri("https://example/0"), | ||
commitId: null, | ||
commitTimeStamp: new DateTime(2018, 1, 1), | ||
types: null, | ||
typeUris: new List<Uri> { Schema.DataTypes.PackageDetails }, | ||
packageIdentity: new PackageIdentity("NuGet.Versioning", NuGetVersion.Parse("1.0.0"))), | ||
new CatalogCommitItem( | ||
uri: new Uri("https://example/1"), | ||
commitId: null, | ||
commitTimeStamp: new DateTime(2018, 1, 2), | ||
types: null, | ||
typeUris: new List<Uri> { Schema.DataTypes.PackageDelete }, | ||
packageIdentity: new PackageIdentity("NuGet.Frameworks", NuGetVersion.Parse("2.0.0"))), | ||
}; | ||
|
||
await Target.OnProcessBatchAsync(items); | ||
|
||
CatalogClient.Verify(x => x.GetPackageDetailsLeafAsync("https://example/0"), Times.Once); | ||
CatalogClient.Verify(x => x.GetPackageDetailsLeafAsync(It.IsAny<string>()), Times.Exactly(1)); | ||
CatalogClient.Verify(x => x.GetPackageDeleteLeafAsync(It.IsAny<string>()), Times.Never); | ||
|
||
RegistrationUpdater.Verify( | ||
x => x.UpdateAsync( | ||
"NuGet.Versioning", | ||
It.Is<IReadOnlyList<CatalogCommitItem>>( | ||
y => y.Count == 1), | ||
It.Is<IReadOnlyDictionary<CatalogCommitItem, PackageDetailsCatalogLeaf>>( | ||
y => y.Count == 1)), | ||
Times.Once); | ||
RegistrationUpdater.Verify( | ||
x => x.UpdateAsync( | ||
"NuGet.Frameworks", | ||
It.Is<IReadOnlyList<CatalogCommitItem>>( | ||
y => y.Count == 1), | ||
It.Is<IReadOnlyDictionary<CatalogCommitItem, PackageDetailsCatalogLeaf>>( | ||
y => y.Count == 0)), | ||
Times.Once); | ||
} | ||
|
||
[Fact] | ||
public async Task OperatesOnLatestPerPackageIdentityAndGroupsById() | ||
{ | ||
var items = new[] | ||
{ | ||
new CatalogCommitItem( | ||
uri: new Uri("https://example/0"), | ||
commitId: null, | ||
commitTimeStamp: new DateTime(2018, 1, 1), | ||
types: null, | ||
typeUris: new List<Uri> { Schema.DataTypes.PackageDetails }, | ||
packageIdentity: new PackageIdentity("NuGet.Versioning", NuGetVersion.Parse("1.0.0"))), | ||
new CatalogCommitItem( | ||
uri: new Uri("https://example/1"), | ||
commitId: null, | ||
commitTimeStamp: new DateTime(2018, 1, 2), | ||
types: null, | ||
typeUris: new List<Uri> { Schema.DataTypes.PackageDetails }, | ||
packageIdentity: new PackageIdentity("NuGet.Versioning", NuGetVersion.Parse("1.0.0"))), | ||
new CatalogCommitItem( | ||
uri: new Uri("https://example/2"), | ||
commitId: null, | ||
commitTimeStamp: new DateTime(2018, 1, 2), | ||
types: null, | ||
typeUris: new List<Uri> { Schema.DataTypes.PackageDetails }, | ||
packageIdentity: new PackageIdentity("NuGet.Versioning", NuGetVersion.Parse("2.0.0"))), | ||
new CatalogCommitItem( | ||
uri: new Uri("https://example/3"), | ||
commitId: null, | ||
commitTimeStamp: new DateTime(2018, 1, 2), | ||
types: null, | ||
typeUris: new List<Uri> { Schema.DataTypes.PackageDetails }, | ||
packageIdentity: new PackageIdentity("NuGet.Frameworks", NuGetVersion.Parse("1.0.0"))), | ||
}; | ||
|
||
await Target.OnProcessBatchAsync(items); | ||
|
||
CatalogClient.Verify(x => x.GetPackageDetailsLeafAsync("https://example/1"), Times.Once); | ||
CatalogClient.Verify(x => x.GetPackageDetailsLeafAsync("https://example/2"), Times.Once); | ||
CatalogClient.Verify(x => x.GetPackageDetailsLeafAsync("https://example/3"), Times.Once); | ||
CatalogClient.Verify(x => x.GetPackageDetailsLeafAsync(It.IsAny<string>()), Times.Exactly(3)); | ||
|
||
RegistrationUpdater.Verify( | ||
x => x.UpdateAsync( | ||
"NuGet.Versioning", | ||
It.Is<IReadOnlyList<CatalogCommitItem>>( | ||
y => y.Count == 2), | ||
It.Is<IReadOnlyDictionary<CatalogCommitItem, PackageDetailsCatalogLeaf>>( | ||
y => y.Count == 2)), | ||
Times.Once); | ||
RegistrationUpdater.Verify( | ||
x => x.UpdateAsync( | ||
"NuGet.Frameworks", | ||
It.Is<IReadOnlyList<CatalogCommitItem>>( | ||
y => y.Count == 1), | ||
It.Is<IReadOnlyDictionary<CatalogCommitItem, PackageDetailsCatalogLeaf>>( | ||
y => y.Count == 1)), | ||
Times.Once); | ||
} | ||
|
||
[Fact] | ||
public async Task RejectsMultipleLeavesForTheSamePackageAtTheSameTime() | ||
{ | ||
var items = new[] | ||
{ | ||
new CatalogCommitItem( | ||
uri: new Uri("https://example/0"), | ||
commitId: null, | ||
commitTimeStamp: new DateTime(2018, 1, 1), | ||
types: null, | ||
typeUris: new List<Uri> { Schema.DataTypes.PackageDetails }, | ||
packageIdentity: new PackageIdentity("NuGet.Versioning", NuGetVersion.Parse("1.0.0"))), | ||
new CatalogCommitItem( | ||
uri: new Uri("https://example/1"), | ||
commitId: null, | ||
commitTimeStamp: new DateTime(2018, 1, 1), | ||
types: null, | ||
typeUris: new List<Uri> { Schema.DataTypes.PackageDetails }, | ||
packageIdentity: new PackageIdentity("NuGet.Versioning", NuGetVersion.Parse("1.0.0"))), | ||
}; | ||
|
||
var ex = await Assert.ThrowsAsync<InvalidOperationException>( | ||
() => Target.OnProcessBatchAsync(items)); | ||
|
||
Assert.Equal( | ||
"There are multiple catalog leaves for a single package at one time.", | ||
ex.Message); | ||
RegistrationUpdater.Verify( | ||
x => x.UpdateAsync( | ||
It.IsAny<string>(), | ||
It.IsAny<IReadOnlyList<CatalogCommitItem>>(), | ||
It.IsAny<IReadOnlyDictionary<CatalogCommitItem, PackageDetailsCatalogLeaf>>()), | ||
Times.Never); | ||
} | ||
} | ||
|
||
public abstract class Facts | ||
{ | ||
public Facts(ITestOutputHelper output) | ||
{ | ||
CatalogClient = new Mock<ICatalogClient>(); | ||
V3TelemetryService = new Mock<IV3TelemetryService>(); | ||
CommitCollectorOptions = new Mock<IOptionsSnapshot<CommitCollectorConfiguration>>(); | ||
RegistrationUpdater = new Mock<IRegistrationUpdater>(); | ||
Options = new Mock<IOptionsSnapshot<Catalog2RegistrationConfiguration>>(); | ||
|
||
CommitCollectorConfiguration = new CommitCollectorConfiguration { MaxConcurrentCatalogLeafDownloads = 1 }; | ||
CommitCollectorOptions.Setup(x => x.Value).Returns(() => CommitCollectorConfiguration); | ||
Configuration = new Catalog2RegistrationConfiguration { MaxConcurrentIds = 1 }; | ||
Options.Setup(x => x.Value).Returns(() => Configuration); | ||
|
||
Target = new RegistrationCollectorLogic( | ||
new CommitCollectorUtility( | ||
CatalogClient.Object, | ||
V3TelemetryService.Object, | ||
CommitCollectorOptions.Object, | ||
output.GetLogger<CommitCollectorUtility>()), | ||
RegistrationUpdater.Object, | ||
Options.Object, | ||
output.GetLogger<RegistrationCollectorLogic>()); | ||
} | ||
|
||
public Mock<ICatalogClient> CatalogClient { get; } | ||
public Mock<IV3TelemetryService> V3TelemetryService { get; } | ||
public Mock<IOptionsSnapshot<CommitCollectorConfiguration>> CommitCollectorOptions { get; } | ||
public Mock<IRegistrationUpdater> RegistrationUpdater { get; } | ||
public Mock<IOptionsSnapshot<Catalog2RegistrationConfiguration>> Options { get; } | ||
public CommitCollectorConfiguration CommitCollectorConfiguration { get; } | ||
public Catalog2RegistrationConfiguration Configuration { get; } | ||
public RegistrationCollectorLogic Target { get; } | ||
} | ||
} | ||
} |