Skip to content
This repository has been archived by the owner on Jul 30, 2024. It is now read-only.
/ NuGet.Jobs Public archive

Commit

Permalink
Extract collector DI and registration client to generic V3 project (#677
Browse files Browse the repository at this point in the history
)

This supports building other collectors that use dependency injection like Catalog2AzureSearch.
Also, the registration client code is not Azure Search specific so a more generic place is better.
This is a PR to move code around that has no impact on behavior.
It allows me to share code between other new collectors and Catalog2AzureSearch without pulling in all of the Azure Search dependencies.
  • Loading branch information
joelverhagen committed Oct 22, 2019
1 parent 7a4b957 commit d0f6356
Show file tree
Hide file tree
Showing 72 changed files with 662 additions and 289 deletions.
30 changes: 30 additions & 0 deletions NuGet.Services.Metadata.sln
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
build.ps1 = build.ps1
EndProjectSection
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NuGet.Services.V3", "src\NuGet.Services.V3\NuGet.Services.V3.csproj", "{C3F9A738-9759-4B2B-A50D-6507B28A659B}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NuGet.Services.V3.Tests", "tests\NuGet.Services.V3.Tests\NuGet.Services.V3.Tests.csproj", "{CCB4D5EF-AC84-449D-AC6E-0A0AD295483A}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -321,6 +325,30 @@ Global
{7E6903A4-DBE1-444E-A8E3-C1DBB58243E0}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{7E6903A4-DBE1-444E-A8E3-C1DBB58243E0}.Release|x64.ActiveCfg = Release|Any CPU
{7E6903A4-DBE1-444E-A8E3-C1DBB58243E0}.Release|x64.Build.0 = Release|Any CPU
{C3F9A738-9759-4B2B-A50D-6507B28A659B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{C3F9A738-9759-4B2B-A50D-6507B28A659B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C3F9A738-9759-4B2B-A50D-6507B28A659B}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU
{C3F9A738-9759-4B2B-A50D-6507B28A659B}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU
{C3F9A738-9759-4B2B-A50D-6507B28A659B}.Debug|x64.ActiveCfg = Debug|Any CPU
{C3F9A738-9759-4B2B-A50D-6507B28A659B}.Debug|x64.Build.0 = Debug|Any CPU
{C3F9A738-9759-4B2B-A50D-6507B28A659B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C3F9A738-9759-4B2B-A50D-6507B28A659B}.Release|Any CPU.Build.0 = Release|Any CPU
{C3F9A738-9759-4B2B-A50D-6507B28A659B}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU
{C3F9A738-9759-4B2B-A50D-6507B28A659B}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{C3F9A738-9759-4B2B-A50D-6507B28A659B}.Release|x64.ActiveCfg = Release|Any CPU
{C3F9A738-9759-4B2B-A50D-6507B28A659B}.Release|x64.Build.0 = Release|Any CPU
{CCB4D5EF-AC84-449D-AC6E-0A0AD295483A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{CCB4D5EF-AC84-449D-AC6E-0A0AD295483A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{CCB4D5EF-AC84-449D-AC6E-0A0AD295483A}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU
{CCB4D5EF-AC84-449D-AC6E-0A0AD295483A}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU
{CCB4D5EF-AC84-449D-AC6E-0A0AD295483A}.Debug|x64.ActiveCfg = Debug|Any CPU
{CCB4D5EF-AC84-449D-AC6E-0A0AD295483A}.Debug|x64.Build.0 = Debug|Any CPU
{CCB4D5EF-AC84-449D-AC6E-0A0AD295483A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{CCB4D5EF-AC84-449D-AC6E-0A0AD295483A}.Release|Any CPU.Build.0 = Release|Any CPU
{CCB4D5EF-AC84-449D-AC6E-0A0AD295483A}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU
{CCB4D5EF-AC84-449D-AC6E-0A0AD295483A}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{CCB4D5EF-AC84-449D-AC6E-0A0AD295483A}.Release|x64.ActiveCfg = Release|Any CPU
{CCB4D5EF-AC84-449D-AC6E-0A0AD295483A}.Release|x64.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -347,6 +375,8 @@ Global
{F009209D-A663-45E1-87E8-158569A0F097} = {F1C83FD9-A498-483E-ADFA-B55D82A14965}
{F81A7CA6-97D4-4958-A9C1-FF94FCA283CB} = {C86C6DEE-84E1-4E4E-8868-6755D7A8E0E4}
{7E6903A4-DBE1-444E-A8E3-C1DBB58243E0} = {C86C6DEE-84E1-4E4E-8868-6755D7A8E0E4}
{C3F9A738-9759-4B2B-A50D-6507B28A659B} = {5DE01C58-D5F7-482F-8256-A8333064384C}
{CCB4D5EF-AC84-449D-AC6E-0A0AD295483A} = {F1C83FD9-A498-483E-ADFA-B55D82A14965}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {D3AB83E9-02B4-4FFA-A2D0-637F0B97E626}
Expand Down
2 changes: 2 additions & 0 deletions src/NuGet.Jobs.Catalog2AzureSearch/Job.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using Microsoft.Extensions.DependencyInjection;
using NuGet.Services.AzureSearch;
using NuGet.Services.AzureSearch.Catalog2AzureSearch;
using NuGet.Services.V3;

namespace NuGet.Jobs
{
Expand All @@ -17,6 +18,7 @@ protected override void ConfigureJobServices(IServiceCollection services, IConfi
base.ConfigureJobServices(services, configurationRoot);

services.Configure<Catalog2AzureSearchConfiguration>(configurationRoot.GetSection(ConfigurationSectionName));
services.Configure<CommitCollectorConfiguration>(configurationRoot.GetSection(ConfigurationSectionName));
services.Configure<AzureSearchJobConfiguration>(configurationRoot.GetSection(ConfigurationSectionName));
services.Configure<AzureSearchConfiguration>(configurationRoot.GetSection(ConfigurationSectionName));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@
<Project>{1a53fe3d-8041-4773-942f-d73aef5b82b2}</Project>
<Name>NuGet.Services.AzureSearch</Name>
</ProjectReference>
<ProjectReference Include="..\NuGet.Services.V3\NuGet.Services.V3.csproj">
<Project>{C3F9A738-9759-4B2B-A50D-6507B28A659B}</Project>
<Name>NuGet.Services.V3</Name>
</ProjectReference>
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<PropertyGroup>
Expand Down
10 changes: 0 additions & 10 deletions src/NuGet.Services.AzureSearch/AzureSearchTelemetryService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -230,16 +230,6 @@ public void TrackGetSearchServiceStatus(SearchStatusOptions options, bool succes
});
}

public IDisposable TrackCatalogLeafDownloadBatch(int count)
{
return _telemetryClient.TrackDuration(
Prefix + "CatalogLeafDownloadBatchSeconds",
new Dictionary<string, string>
{
{ "Count", count.ToString() },
});
}

public void TrackDocumentCountQuery(string indexName, long count, TimeSpan elapsed)
{
_telemetryClient.TrackMetric(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,30 @@
using Microsoft.Extensions.Options;
using NuGet.Protocol.Catalog;
using NuGet.Services.Metadata.Catalog;
using NuGet.Services.V3;

namespace NuGet.Services.AzureSearch.Catalog2AzureSearch
{
public class AzureSearchCollectorLogic : ICommitCollectorLogic
{
private readonly ICatalogClient _catalogClient;
private readonly ICatalogIndexActionBuilder _indexActionBuilder;
private readonly Func<IBatchPusher> _batchPusherFactory;
private readonly CommitCollectorUtility _utility;
private readonly IOptionsSnapshot<Catalog2AzureSearchConfiguration> _options;
private readonly IAzureSearchTelemetryService _telemetryService;
private readonly ILogger<AzureSearchCollectorLogic> _logger;

public AzureSearchCollectorLogic(
ICatalogClient catalogClient,
ICatalogIndexActionBuilder indexActionBuilder,
Func<IBatchPusher> batchPusherFactory,
CommitCollectorUtility utility,
IOptionsSnapshot<Catalog2AzureSearchConfiguration> options,
IAzureSearchTelemetryService telemetryService,
ILogger<AzureSearchCollectorLogic> logger)
{
_catalogClient = catalogClient ?? throw new ArgumentNullException(nameof(catalogClient));
_indexActionBuilder = indexActionBuilder ?? throw new ArgumentNullException(nameof(indexActionBuilder));
_batchPusherFactory = batchPusherFactory ?? throw new ArgumentNullException(nameof(batchPusherFactory));
_utility = utility ?? throw new ArgumentNullException(nameof(utility));
_options = options ?? throw new ArgumentNullException(nameof(options));
_telemetryService = telemetryService ?? throw new ArgumentNullException(nameof(telemetryService));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
Expand All @@ -43,51 +44,21 @@ public AzureSearchCollectorLogic(
nameof(options),
$"The {nameof(AzureSearchJobConfiguration.MaxConcurrentBatches)} must be greater than zero.");
}

if (_options.Value.MaxConcurrentCatalogLeafDownloads <= 0)
{
throw new ArgumentOutOfRangeException(
nameof(options),
$"The {nameof(Catalog2AzureSearchConfiguration.MaxConcurrentCatalogLeafDownloads)} must be greater than zero.");
}
}

public Task<IEnumerable<CatalogCommitItemBatch>> CreateBatchesAsync(IEnumerable<CatalogCommitItem> catalogItems)
{
if (!catalogItems.Any())
{
return Task.FromResult(Enumerable.Empty<CatalogCommitItemBatch>());
}

var maxCommitTimestamp = catalogItems.Max(x => x.CommitTimeStamp);

// Create a single batch of all unprocessed catalog items so that we can have complete control of the
// parallelism in this class.
return Task.FromResult<IEnumerable<CatalogCommitItemBatch>>(new[]
{
new CatalogCommitItemBatch(
catalogItems,
key: null,
commitTimestamp: maxCommitTimestamp),
});
return Task.FromResult(_utility.CreateSingleBatch(catalogItems));
}

public async Task OnProcessBatchAsync(
IEnumerable<CatalogCommitItem> items)
{
var itemList = items.ToList();

// Ignore all but the latest catalog commit items per package identity.
var latestItems = items
.GroupBy(x => x.PackageIdentity)
.Select(GetLatest)
.ToList();

// Group the catalog commit items by package ID.
var workEnumerable = latestItems
.GroupBy(x => x.PackageIdentity.Id, StringComparer.OrdinalIgnoreCase)
.Select(x => new Work(x.Key, x.ToList()));
var allWork = new ConcurrentBag<Work>(workEnumerable);
var latestItems = _utility.GetLatestPerIdentity(items);
var allWork = _utility.GroupById(latestItems);

using (_telemetryService.TrackCatalog2AzureSearchProcessBatch(itemList.Count, latestItems.Count, allWork.Count))
{
Expand All @@ -109,10 +80,10 @@ public async Task OnProcessBatchAsync(

private async Task<ConcurrentBag<IdAndValue<IndexActions>>> ProcessWorkAsync(
IReadOnlyList<CatalogCommitItem> latestItems,
ConcurrentBag<Work> allWork)
ConcurrentBag<IdAndValue<IReadOnlyList<CatalogCommitItem>>> allWork)
{
// Fetch the full catalog leaf for each item that is the package details type.
var allEntryToLeaf = await GetEntryToLeafAsync(latestItems);
var allEntryToLeaf = await _utility.GetEntryToDetailsLeafAsync(latestItems);

// Process the package ID groups in parallel, collecting all index actions for later.
var output = new ConcurrentBag<IdAndValue<IndexActions>>();
Expand All @@ -124,11 +95,11 @@ private async Task<ConcurrentBag<IdAndValue<IndexActions>>> ProcessWorkAsync(
while (allWork.TryTake(out var work))
{
var entryToLeaf = work
.Entries
.Where(IsOnlyPackageDetails)
.Value
.Where(CommitCollectorUtility.IsOnlyPackageDetails)
.ToDictionary(e => e, e => allEntryToLeaf[e], ReferenceEqualityComparer<CatalogCommitItem>.Default);
var indexActions = await GetPackageIdIndexActionsAsync(work.Entries, entryToLeaf);
output.Add(new IdAndValue<IndexActions>(work.PackageId, indexActions));
var indexActions = await GetPackageIdIndexActionsAsync(work.Value, entryToLeaf);
output.Add(new IdAndValue<IndexActions>(work.Id, indexActions));
}
})
.ToList();
Expand All @@ -137,97 +108,6 @@ private async Task<ConcurrentBag<IdAndValue<IndexActions>>> ProcessWorkAsync(
return output;
}

private CatalogCommitItem GetLatest(IEnumerable<CatalogCommitItem> entries)
{
CatalogCommitItem max = null;
foreach (var entry in entries)
{
if (max == null)
{
max = entry;
continue;
}

Guard.Assert(
StringComparer.OrdinalIgnoreCase.Equals(max.PackageIdentity, entry.PackageIdentity),
"The entries compared should have the same package identity.");

if (entry.CommitTimeStamp > max.CommitTimeStamp)
{
max = entry;
}
else if (entry.CommitTimeStamp == max.CommitTimeStamp)
{
const string message = "There are multiple catalog leaves for a single package at one time.";
_logger.LogError(
message + " ID: {PackageId}, version: {PackageVersion}, commit timestamp: {CommitTimestamp:O}",
entry.PackageIdentity.Id,
entry.PackageIdentity.Version.ToFullString(),
entry.CommitTimeStamp);
throw new InvalidOperationException(message);
}
}

return max;
}

private async Task<IReadOnlyDictionary<CatalogCommitItem, PackageDetailsCatalogLeaf>> GetEntryToLeafAsync(
IEnumerable<CatalogCommitItem> entries)
{
var packageDetailsEntries = entries.Where(IsOnlyPackageDetails);
var allWork = new ConcurrentBag<CatalogCommitItem>(packageDetailsEntries);
var output = new ConcurrentBag<KeyValuePair<CatalogCommitItem, PackageDetailsCatalogLeaf>>();

using (_telemetryService.TrackCatalogLeafDownloadBatch(allWork.Count))
{
var tasks = Enumerable
.Range(0, _options.Value.MaxConcurrentCatalogLeafDownloads)
.Select(async x =>
{
await Task.Yield();
while (allWork.TryTake(out var work))
{
try
{
_logger.LogInformation(
"Downloading catalog leaf for {PackageId} {Version}: {Url}",
work.PackageIdentity.Id,
work.PackageIdentity.Version.ToNormalizedString(),
work.Uri.AbsoluteUri);

var leaf = await _catalogClient.GetPackageDetailsLeafAsync(work.Uri.AbsoluteUri);
output.Add(KeyValuePair.Create(work, leaf));
}
catch (Exception ex)
{
_logger.LogError(
0,
ex,
"An exception was thrown when fetching the package details leaf for {Id} {Version}. " +
"The URL is {Url}",
work.PackageIdentity.Id,
work.PackageIdentity.Version,
work.Uri.AbsoluteUri);
throw;
}
}
})
.ToList();

await Task.WhenAll(tasks);

return output.ToDictionary(
x => x.Key,
x => x.Value,
ReferenceEqualityComparer<CatalogCommitItem>.Default);
}
}

private static bool IsOnlyPackageDetails(CatalogCommitItem e)
{
return e.IsPackageDetails && !e.IsPackageDelete;
}

private async Task<IndexActions> GetPackageIdIndexActionsAsync(
IReadOnlyList<CatalogCommitItem> entries,
IReadOnlyDictionary<CatalogCommitItem, PackageDetailsCatalogLeaf> entryToLeaf)
Expand All @@ -242,19 +122,5 @@ private async Task<IndexActions> GetPackageIdIndexActionsAsync(
entries,
entryToLeaf);
}

private class Work
{
public Work(
string packageId,
IReadOnlyList<CatalogCommitItem> entries)
{
PackageId = packageId ?? throw new ArgumentNullException(nameof(packageId));
Entries = entries ?? throw new ArgumentNullException(nameof(entries));
}

public string PackageId { get; }
public IReadOnlyList<CatalogCommitItem> Entries { get; }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using Microsoft.WindowsAzure.Storage;
using NuGet.Services.Metadata.Catalog;
using NuGet.Services.Metadata.Catalog.Persistence;
using NuGet.Services.V3;

namespace NuGet.Services.AzureSearch.Catalog2AzureSearch
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@

using System;
using System.Collections.Generic;
using NuGet.Services.V3;

namespace NuGet.Services.AzureSearch.Catalog2AzureSearch
{
public class Catalog2AzureSearchConfiguration : AzureSearchJobConfiguration
public class Catalog2AzureSearchConfiguration : AzureSearchJobConfiguration, ICommitCollectorConfiguration
{
public int MaxConcurrentCatalogLeafDownloads { get; set; } = 64;
public bool CreateContainersAndIndexes { get; set; }
Expand Down
Loading

0 comments on commit d0f6356

Please sign in to comment.