diff --git a/NuGet.Services.Metadata.sln b/NuGet.Services.Metadata.sln
index 71ac542df..464d5e1d3 100644
--- a/NuGet.Services.Metadata.sln
+++ b/NuGet.Services.Metadata.sln
@@ -40,6 +40,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CatalogMetadataTests", "tes
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "V3PerPackage", "src\V3PerPackage\V3PerPackage.csproj", "{E76E73FA-4462-4F07-94C0-8B9CC413F696}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NuGet.Protocol.Catalog", "src\NuGet.Protocol.Catalog\NuGet.Protocol.Catalog.csproj", "{D44C2E89-2D98-44BD-8712-8CCBE4E67C9C}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NuGet.Protocol.Catalog.Tests", "tests\NuGet.Protocol.Catalog.Tests\NuGet.Protocol.Catalog.Tests.csproj", "{1F3BC053-796C-4A35-88F4-955A0F142197}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -190,6 +194,30 @@ Global
{E76E73FA-4462-4F07-94C0-8B9CC413F696}.Release|Mixed Platforms.Build.0 = Release|Any CPU
{E76E73FA-4462-4F07-94C0-8B9CC413F696}.Release|x64.ActiveCfg = Release|Any CPU
{E76E73FA-4462-4F07-94C0-8B9CC413F696}.Release|x64.Build.0 = Release|Any CPU
+ {D44C2E89-2D98-44BD-8712-8CCBE4E67C9C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {D44C2E89-2D98-44BD-8712-8CCBE4E67C9C}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {D44C2E89-2D98-44BD-8712-8CCBE4E67C9C}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU
+ {D44C2E89-2D98-44BD-8712-8CCBE4E67C9C}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU
+ {D44C2E89-2D98-44BD-8712-8CCBE4E67C9C}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {D44C2E89-2D98-44BD-8712-8CCBE4E67C9C}.Debug|x64.Build.0 = Debug|Any CPU
+ {D44C2E89-2D98-44BD-8712-8CCBE4E67C9C}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {D44C2E89-2D98-44BD-8712-8CCBE4E67C9C}.Release|Any CPU.Build.0 = Release|Any CPU
+ {D44C2E89-2D98-44BD-8712-8CCBE4E67C9C}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU
+ {D44C2E89-2D98-44BD-8712-8CCBE4E67C9C}.Release|Mixed Platforms.Build.0 = Release|Any CPU
+ {D44C2E89-2D98-44BD-8712-8CCBE4E67C9C}.Release|x64.ActiveCfg = Release|Any CPU
+ {D44C2E89-2D98-44BD-8712-8CCBE4E67C9C}.Release|x64.Build.0 = Release|Any CPU
+ {1F3BC053-796C-4A35-88F4-955A0F142197}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {1F3BC053-796C-4A35-88F4-955A0F142197}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {1F3BC053-796C-4A35-88F4-955A0F142197}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU
+ {1F3BC053-796C-4A35-88F4-955A0F142197}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU
+ {1F3BC053-796C-4A35-88F4-955A0F142197}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {1F3BC053-796C-4A35-88F4-955A0F142197}.Debug|x64.Build.0 = Debug|Any CPU
+ {1F3BC053-796C-4A35-88F4-955A0F142197}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {1F3BC053-796C-4A35-88F4-955A0F142197}.Release|Any CPU.Build.0 = Release|Any CPU
+ {1F3BC053-796C-4A35-88F4-955A0F142197}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU
+ {1F3BC053-796C-4A35-88F4-955A0F142197}.Release|Mixed Platforms.Build.0 = Release|Any CPU
+ {1F3BC053-796C-4A35-88F4-955A0F142197}.Release|x64.ActiveCfg = Release|Any CPU
+ {1F3BC053-796C-4A35-88F4-955A0F142197}.Release|x64.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -207,6 +235,8 @@ Global
{1745A383-D0BE-484B-81EB-27B20F6AC6C5} = {5DE01C58-D5F7-482F-8256-A8333064384C}
{34AABA7F-1FF7-4F4B-B1DB-D07AD4505DA4} = {F1C83FD9-A498-483E-ADFA-B55D82A14965}
{E76E73FA-4462-4F07-94C0-8B9CC413F696} = {C86C6DEE-84E1-4E4E-8868-6755D7A8E0E4}
+ {D44C2E89-2D98-44BD-8712-8CCBE4E67C9C} = {5DE01C58-D5F7-482F-8256-A8333064384C}
+ {1F3BC053-796C-4A35-88F4-955A0F142197} = {F1C83FD9-A498-483E-ADFA-B55D82A14965}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {D3AB83E9-02B4-4FFA-A2D0-637F0B97E626}
diff --git a/build.ps1 b/build.ps1
index ddc8ff97c..999cf7700 100644
--- a/build.ps1
+++ b/build.ps1
@@ -9,7 +9,7 @@ param (
[string]$SemanticVersion = '1.0.0-zlocal',
[string]$Branch,
[string]$CommitSHA,
- [string]$BuildBranch = '5fd8377a9abf3ff411918dbb973948a6677432db'
+ [string]$BuildBranch = 'b5f9d1c89da96c462935e2195ceb00e69287b93e'
)
# For TeamCity - If any issue occurs, this script fails the build. - By default, TeamCity returns an exit code of 0 for all powershell scripts, even if they fail
@@ -79,7 +79,8 @@ Invoke-BuildStep 'Set version metadata in AssemblyInfo.cs' {
"src\Catalog\Properties\AssemblyInfo.g.cs", `
"src\NuGet.ApplicationInsights.Owin\Properties\AssemblyInfo.g.cs", `
"src\Ng\Properties\AssemblyInfo.g.cs", `
- "src\NuGet.Services.Metadata.Catalog.Monitoring\Properties\AssemblyInfo.g.cs"
+ "src\NuGet.Services.Metadata.Catalog.Monitoring\Properties\AssemblyInfo.g.cs", `
+ "src\NuGet.Protocol.Catalog\Properties\AssemblyInfo.g.cs"
Foreach ($assemblyInfo in $assemblyInfos) {
Set-VersionInfo -Path (Join-Path $PSScriptRoot $assemblyInfo) -Version $SimpleVersion -Branch $Branch -Commit $CommitSHA
diff --git a/src/Catalog/BatchProcessingException.cs b/src/Catalog/BatchProcessingException.cs
index 23a587b63..cdc135cf1 100644
--- a/src/Catalog/BatchProcessingException.cs
+++ b/src/Catalog/BatchProcessingException.cs
@@ -8,7 +8,7 @@ namespace NuGet.Services.Metadata.Catalog
public sealed class BatchProcessingException : Exception
{
public BatchProcessingException(Exception inner)
- : base(Strings.BatchProcessingFailure)
+ : base(Strings.BatchProcessingFailure, inner ?? throw new ArgumentNullException(nameof(inner)))
{
}
}
diff --git a/src/Catalog/CatalogCommit.cs b/src/Catalog/CatalogCommit.cs
new file mode 100644
index 000000000..366eefd98
--- /dev/null
+++ b/src/Catalog/CatalogCommit.cs
@@ -0,0 +1,51 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+using System;
+using System.Globalization;
+using Newtonsoft.Json.Linq;
+
+namespace NuGet.Services.Metadata.Catalog
+{
+ ///
+ /// Represents a single catalog commit.
+ ///
+ public sealed class CatalogCommit : IComparable
+ {
+ private CatalogCommit(DateTime commitTimeStamp, Uri uri)
+ {
+ CommitTimeStamp = commitTimeStamp;
+ Uri = uri;
+ }
+
+ public DateTime CommitTimeStamp { get; }
+ public Uri Uri { get; }
+
+ public int CompareTo(object obj)
+ {
+ var other = obj as CatalogCommit;
+
+ if (ReferenceEquals(other, null))
+ {
+ throw new ArgumentException(
+ string.Format(CultureInfo.InvariantCulture, Strings.ArgumentMustBeInstanceOfType, nameof(CatalogCommit)),
+ nameof(obj));
+ }
+
+ return CommitTimeStamp.CompareTo(other.CommitTimeStamp);
+ }
+
+ public static CatalogCommit Create(JObject commit)
+ {
+ if (commit == null)
+ {
+ throw new ArgumentNullException(nameof(commit));
+ }
+
+ var commitTimeStamp = Utils.Deserialize(commit, "commitTimeStamp");
+ var uri = Utils.Deserialize(commit, "@id");
+
+ return new CatalogCommit(commitTimeStamp, uri);
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/Catalog/CatalogCommitItem.cs b/src/Catalog/CatalogCommitItem.cs
new file mode 100644
index 000000000..a240f8e5f
--- /dev/null
+++ b/src/Catalog/CatalogCommitItem.cs
@@ -0,0 +1,108 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+using System;
+using System.Collections.Generic;
+using System.Globalization;
+using System.Linq;
+using Newtonsoft.Json.Linq;
+using NuGet.Packaging.Core;
+using NuGet.Versioning;
+
+namespace NuGet.Services.Metadata.Catalog
+{
+ ///
+ /// Represents a single item in a catalog commit.
+ ///
+ public sealed class CatalogCommitItem : IComparable
+ {
+ private const string _typeKeyword = "@type";
+
+ private CatalogCommitItem(
+ Uri uri,
+ string commitId,
+ DateTime commitTimeStamp,
+ IReadOnlyList types,
+ IReadOnlyList typeUris,
+ PackageIdentity packageIdentity)
+ {
+ Uri = uri;
+ CommitId = commitId;
+ CommitTimeStamp = commitTimeStamp;
+ PackageIdentity = packageIdentity;
+ Types = types;
+ TypeUris = typeUris;
+ }
+
+ public Uri Uri { get; }
+ public DateTime CommitTimeStamp { get; }
+ public string CommitId { get; }
+ public PackageIdentity PackageIdentity { get; }
+ public IReadOnlyList Types { get; }
+ public IReadOnlyList TypeUris { get; }
+
+ public int CompareTo(object obj)
+ {
+ var other = obj as CatalogCommitItem;
+
+ if (ReferenceEquals(other, null))
+ {
+ throw new ArgumentException(
+ string.Format(CultureInfo.InvariantCulture, Strings.ArgumentMustBeInstanceOfType, nameof(CatalogCommitItem)),
+ nameof(obj));
+ }
+
+ return CommitTimeStamp.CompareTo(other.CommitTimeStamp);
+ }
+
+ public static CatalogCommitItem Create(JObject context, JObject commitItem)
+ {
+ if (context == null)
+ {
+ throw new ArgumentNullException(nameof(context));
+ }
+
+ if (commitItem == null)
+ {
+ throw new ArgumentNullException(nameof(commitItem));
+ }
+
+ var commitTimeStamp = Utils.Deserialize(commitItem, "commitTimeStamp");
+ var commitId = Utils.Deserialize(commitItem, "commitId");
+ var idUri = Utils.Deserialize(commitItem, "@id");
+ var packageId = Utils.Deserialize(commitItem, "nuget:id");
+ var packageVersion = Utils.Deserialize(commitItem, "nuget:version");
+ var packageIdentity = new PackageIdentity(packageId, new NuGetVersion(packageVersion));
+ var types = GetTypes(commitItem).ToArray();
+
+ if (!types.Any())
+ {
+ throw new ArgumentException(
+ string.Format(CultureInfo.InvariantCulture, Strings.NonEmptyPropertyValueRequired, _typeKeyword),
+ nameof(commitItem));
+ }
+
+ var typeUris = types.Select(type => Utils.Expand(context, type)).ToArray();
+
+ return new CatalogCommitItem(idUri, commitId, commitTimeStamp, types, typeUris, packageIdentity);
+ }
+
+ private static IEnumerable GetTypes(JObject commitItem)
+ {
+ if (commitItem.TryGetValue(_typeKeyword, out var value))
+ {
+ if (value is JArray)
+ {
+ foreach (JToken typeToken in ((JArray)value).Values())
+ {
+ yield return typeToken.ToString();
+ }
+ }
+ else
+ {
+ yield return value.ToString();
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/Catalog/CatalogCommitItemBatch.cs b/src/Catalog/CatalogCommitItemBatch.cs
new file mode 100644
index 000000000..4abbbcbb8
--- /dev/null
+++ b/src/Catalog/CatalogCommitItemBatch.cs
@@ -0,0 +1,44 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+
+namespace NuGet.Services.Metadata.Catalog
+{
+ ///
+ /// Represents a group of .
+ /// Items may span multiple commits but are grouped on common criteria (e.g.: lower-cased package ID).
+ ///
+ public sealed class CatalogCommitItemBatch
+ {
+ ///
+ /// Initializes a instance.
+ ///
+ /// An enumerable of . Items may span multiple commits.
+ /// A unique key for all items in a batch. This is used for parallelization and may be
+ /// null if parallelization is not used.
+ /// Thrown if is either null or empty.
+ public CatalogCommitItemBatch(IEnumerable items, string key = null)
+ {
+ if (items == null || !items.Any())
+ {
+ throw new ArgumentException(Strings.ArgumentMustNotBeNullOrEmpty, nameof(items));
+ }
+
+ var list = items.ToList();
+
+ CommitTimeStamp = list.Min(item => item.CommitTimeStamp);
+ Key = key;
+
+ list.Sort();
+
+ Items = list;
+ }
+
+ public DateTime CommitTimeStamp { get; }
+ public IReadOnlyList Items { get; }
+ public string Key { get; }
+ }
+}
\ No newline at end of file
diff --git a/src/Catalog/CatalogCommitItemBatchTask.cs b/src/Catalog/CatalogCommitItemBatchTask.cs
new file mode 100644
index 000000000..3d8e58401
--- /dev/null
+++ b/src/Catalog/CatalogCommitItemBatchTask.cs
@@ -0,0 +1,68 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+using System;
+using System.Threading.Tasks;
+
+namespace NuGet.Services.Metadata.Catalog
+{
+ ///
+ /// Represents an asynchrononous task associated with catalog changes for a specific
+ /// and potentially spanning multiple commits.
+ ///
+ public sealed class CatalogCommitItemBatchTask : IEquatable
+ {
+ ///
+ /// Initializes a instance.
+ ///
+ /// A .
+ /// A tracking completion of
+ /// processing.
+ /// Thrown if is null.
+ /// Thrown if is null.
+ /// Thrown if is null.
+ public CatalogCommitItemBatchTask(CatalogCommitItemBatch batch, Task task)
+ {
+ if (batch == null)
+ {
+ throw new ArgumentNullException(nameof(batch));
+ }
+
+ if (batch.Key == null)
+ {
+ throw new ArgumentException(Strings.ArgumentMustNotBeNull, $"{nameof(batch)}.{nameof(batch.Key)}");
+ }
+
+ if (task == null)
+ {
+ throw new ArgumentNullException(nameof(task));
+ }
+
+ Batch = batch;
+ Task = task;
+ }
+
+ public CatalogCommitItemBatch Batch { get; }
+ public Task Task { get; }
+
+ public override int GetHashCode()
+ {
+ return Batch.Key.GetHashCode();
+ }
+
+ public override bool Equals(object obj)
+ {
+ return Equals(obj as CatalogCommitItemBatchTask);
+ }
+
+ public bool Equals(CatalogCommitItemBatchTask other)
+ {
+ if (ReferenceEquals(other, null))
+ {
+ return false;
+ }
+
+ return string.Equals(Batch.Key, other.Batch.Key);
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/Catalog/CatalogCommitUtilities.cs b/src/Catalog/CatalogCommitUtilities.cs
new file mode 100644
index 000000000..a4b0cc449
--- /dev/null
+++ b/src/Catalog/CatalogCommitUtilities.cs
@@ -0,0 +1,323 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Globalization;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+using Newtonsoft.Json.Linq;
+using ExceptionUtilities = NuGet.Common.ExceptionUtilities;
+
+namespace NuGet.Services.Metadata.Catalog
+{
+ public static class CatalogCommitUtilities
+ {
+ private static readonly EventId _eventId = new EventId(id: 0);
+
+ ///
+ /// Creates an enumerable of instances.
+ ///
+ ///
+ /// A instance contains only the latest commit for each package identity.
+ ///
+ /// An enumerable of .
+ /// A function that returns a key for a .
+ /// An enumerable of with no ordering guarantee.
+ /// Thrown if is null.
+ /// Thrown if is null.
+ public static IEnumerable CreateCommitItemBatches(
+ IEnumerable catalogItems,
+ GetCatalogCommitItemKey getCatalogCommitItemKey)
+ {
+ if (catalogItems == null)
+ {
+ throw new ArgumentNullException(nameof(catalogItems));
+ }
+
+ if (getCatalogCommitItemKey == null)
+ {
+ throw new ArgumentNullException(nameof(getCatalogCommitItemKey));
+ }
+
+ var catalogItemsGroups = catalogItems
+ .GroupBy(catalogItem => getCatalogCommitItemKey(catalogItem));
+
+ var batches = new List();
+
+ foreach (var catalogItemsGroup in catalogItemsGroups)
+ {
+ var catalogItemsWithOnlyLatestCommitForEachPackageIdentity = catalogItemsGroup
+ .GroupBy(commitItem => new
+ {
+ PackageId = commitItem.PackageIdentity.Id.ToLowerInvariant(),
+ PackageVersion = commitItem.PackageIdentity.Version.ToNormalizedString().ToLowerInvariant()
+ })
+ .Select(group => group.OrderBy(item => item.CommitTimeStamp).Last())
+ .ToArray();
+ var minCommitTimeStamp = catalogItemsWithOnlyLatestCommitForEachPackageIdentity
+ .Select(catalogItem => catalogItem.CommitTimeStamp)
+ .Min();
+
+ batches.Add(
+ new CatalogCommitItemBatch(
+ catalogItemsWithOnlyLatestCommitForEachPackageIdentity,
+ catalogItemsGroup.Key));
+ }
+
+ // Assert only after skipping older commits for each package identity to reduce the likelihood
+ // of unnecessary failures.
+ AssertNotMoreThanOneCommitIdPerCommitTimeStamp(batches, nameof(catalogItems));
+
+ return batches;
+ }
+
+ public static void StartProcessingBatchesIfNoFailures(
+ CollectorHttpClient client,
+ JToken context,
+ List unprocessedBatches,
+ List processingBatches,
+ int maxConcurrentBatches,
+ ProcessCommitItemBatchAsync processCommitItemBatchAsync,
+ CancellationToken cancellationToken)
+ {
+ if (client == null)
+ {
+ throw new ArgumentNullException(nameof(client));
+ }
+
+ if (context == null)
+ {
+ throw new ArgumentNullException(nameof(context));
+ }
+
+ if (unprocessedBatches == null)
+ {
+ throw new ArgumentNullException(nameof(unprocessedBatches));
+ }
+
+ if (processingBatches == null)
+ {
+ throw new ArgumentNullException(nameof(processingBatches));
+ }
+
+ if (maxConcurrentBatches < 1)
+ {
+ throw new ArgumentOutOfRangeException(
+ nameof(maxConcurrentBatches),
+ maxConcurrentBatches,
+ string.Format(Strings.ArgumentOutOfRange, 1, int.MaxValue));
+ }
+
+ if (processCommitItemBatchAsync == null)
+ {
+ throw new ArgumentNullException(nameof(processCommitItemBatchAsync));
+ }
+
+ var hasAnyBatchFailed = processingBatches.Any(batch => batch.Task.IsFaulted || batch.Task.IsCanceled);
+
+ if (hasAnyBatchFailed)
+ {
+ return;
+ }
+
+ var batchesToEnqueue = Math.Min(
+ maxConcurrentBatches - processingBatches.Count(batch => !batch.Task.IsCompleted),
+ unprocessedBatches.Count);
+
+ for (var i = 0; i < batchesToEnqueue; ++i)
+ {
+ var batch = unprocessedBatches[0];
+
+ unprocessedBatches.RemoveAt(0);
+
+ var task = processCommitItemBatchAsync(
+ client,
+ context,
+ batch.Key,
+ batch,
+ lastBatch: null,
+ cancellationToken: cancellationToken);
+ var batchTask = new CatalogCommitItemBatchTask(batch, task);
+
+ processingBatches.Add(batchTask);
+ }
+ }
+
+ internal static async Task ProcessCatalogCommitsAsync(
+ CollectorHttpClient client,
+ ReadWriteCursor front,
+ ReadCursor back,
+ FetchCatalogCommitsAsync fetchCatalogCommitsAsync,
+ CreateCommitItemBatchesAsync createCommitItemBatchesAsync,
+ ProcessCommitItemBatchAsync processCommitItemBatchAsync,
+ int maxConcurrentBatches,
+ ILogger logger,
+ CancellationToken cancellationToken)
+ {
+ IEnumerable rootItems = await fetchCatalogCommitsAsync(client, front, cancellationToken);
+
+ var hasAnyBatchFailed = false;
+ var hasAnyBatchBeenProcessed = false;
+
+ foreach (CatalogCommit rootItem in rootItems)
+ {
+ JObject page = await client.GetJObjectAsync(rootItem.Uri, cancellationToken);
+ var context = (JObject)page["@context"];
+ CatalogCommitItemBatch[] batches = await CreateBatchesForAllAvailableItemsInPageAsync(
+ front,
+ back,
+ page,
+ context,
+ createCommitItemBatchesAsync);
+
+ if (!batches.Any())
+ {
+ continue;
+ }
+
+ hasAnyBatchBeenProcessed = true;
+
+ DateTime maxCommitTimeStamp = GetMaxCommitTimeStamp(batches);
+ var unprocessedBatches = batches.ToList();
+ var processingBatches = new List();
+ var exceptions = new List();
+
+ StartProcessingBatchesIfNoFailures(
+ client,
+ context,
+ unprocessedBatches,
+ processingBatches,
+ maxConcurrentBatches,
+ processCommitItemBatchAsync,
+ cancellationToken);
+
+ while (processingBatches.Any())
+ {
+ var activeTasks = processingBatches.Where(batch => !batch.Task.IsCompleted)
+ .Select(batch => batch.Task)
+ .DefaultIfEmpty(Task.CompletedTask);
+
+ await Task.WhenAny(activeTasks);
+
+ for (var i = 0; i < processingBatches.Count; ++i)
+ {
+ var batch = processingBatches[i];
+
+ if (batch.Task.IsFaulted || batch.Task.IsCanceled)
+ {
+ hasAnyBatchFailed = true;
+
+ if (batch.Task.Exception != null)
+ {
+ var exception = ExceptionUtilities.Unwrap(batch.Task.Exception);
+
+ exceptions.Add(exception);
+ }
+ }
+
+ if (batch.Task.IsCompleted)
+ {
+ processingBatches.RemoveAt(i);
+ --i;
+ }
+ }
+
+ if (!hasAnyBatchFailed)
+ {
+ StartProcessingBatchesIfNoFailures(
+ client,
+ context,
+ unprocessedBatches,
+ processingBatches,
+ maxConcurrentBatches,
+ processCommitItemBatchAsync,
+ cancellationToken);
+ }
+ }
+
+ if (hasAnyBatchFailed)
+ {
+ foreach (var exception in exceptions)
+ {
+ logger.LogError(_eventId, exception, Strings.BatchProcessingFailure);
+ }
+
+ var innerException = exceptions.Count == 1 ? exceptions.Single() : new AggregateException(exceptions);
+
+ throw new BatchProcessingException(innerException);
+ }
+
+ front.Value = maxCommitTimeStamp;
+
+ await front.SaveAsync(cancellationToken);
+
+ Trace.TraceInformation($"{nameof(CatalogCommitUtilities)}.{nameof(ProcessCatalogCommitsAsync)} " +
+ $"{nameof(front)}.{nameof(front.Value)} saved since timestamp changed from previous: {{0}}", front);
+ }
+
+ return hasAnyBatchBeenProcessed;
+ }
+
+ public static string GetPackageIdKey(CatalogCommitItem item)
+ {
+ if (item == null)
+ {
+ throw new ArgumentNullException(nameof(item));
+ }
+
+ return item.PackageIdentity.Id.ToLowerInvariant();
+ }
+
+ private static async Task CreateBatchesForAllAvailableItemsInPageAsync(
+ ReadWriteCursor front,
+ ReadCursor back,
+ JObject page,
+ JObject context,
+ CreateCommitItemBatchesAsync createCommitItemBatchesAsync)
+ {
+ IEnumerable commitItems = page["items"]
+ .Select(item => CatalogCommitItem.Create(context, (JObject)item))
+ .Where(item => item.CommitTimeStamp > front.Value && item.CommitTimeStamp <= back.Value);
+
+ IEnumerable batches = await createCommitItemBatchesAsync(commitItems);
+
+ return batches
+ .OrderBy(batch => batch.CommitTimeStamp)
+ .ToArray();
+ }
+
+ private static DateTime GetMaxCommitTimeStamp(CatalogCommitItemBatch[] batches)
+ {
+ return batches.SelectMany(batch => batch.Items)
+ .Select(item => item.CommitTimeStamp)
+ .Max();
+ }
+
+ private static void AssertNotMoreThanOneCommitIdPerCommitTimeStamp(
+ IEnumerable batches,
+ string parameterName)
+ {
+ var commitsWithSameTimeStampButDifferentCommitIds = batches
+ .SelectMany(batch => batch.Items)
+ .GroupBy(commitItem => commitItem.CommitTimeStamp)
+ .Where(group => group.Select(item => item.CommitId).Distinct().Count() > 1);
+
+ if (commitsWithSameTimeStampButDifferentCommitIds.Any())
+ {
+ var commits = commitsWithSameTimeStampButDifferentCommitIds.SelectMany(group => group)
+ .Select(commit => $"{{ CommitId = {commit.CommitId}, CommitTimeStamp = {commit.CommitTimeStamp.ToString("O")} }}");
+
+ throw new ArgumentException(
+ string.Format(
+ CultureInfo.InvariantCulture,
+ Strings.MultipleCommitIdsForSameCommitTimeStamp,
+ string.Join(", ", commits)),
+ parameterName);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/Catalog/CatalogIndexEntry.cs b/src/Catalog/CatalogIndexEntry.cs
index 3ece5ce4a..48112b8dc 100644
--- a/src/Catalog/CatalogIndexEntry.cs
+++ b/src/Catalog/CatalogIndexEntry.cs
@@ -3,51 +3,44 @@
using System;
using System.Collections.Generic;
-using System.Globalization;
using System.Linq;
using Newtonsoft.Json;
-using Newtonsoft.Json.Linq;
+using NuGet.Packaging.Core;
using NuGet.Versioning;
namespace NuGet.Services.Metadata.Catalog
{
public sealed class CatalogIndexEntry : IComparable
{
- private static readonly CatalogIndexEntryDateComparer _commitTimeStampComparer = new CatalogIndexEntryDateComparer();
-
[JsonConstructor]
private CatalogIndexEntry()
{
Types = Enumerable.Empty();
}
- public CatalogIndexEntry(Uri uri, string type, string commitId, DateTime commitTs, string id, NuGetVersion version)
+ public CatalogIndexEntry(
+ Uri uri,
+ string type,
+ string commitId,
+ DateTime commitTs,
+ PackageIdentity packageIdentity)
{
- Uri = uri ?? throw new ArgumentNullException(nameof(uri));
-
if (string.IsNullOrWhiteSpace(type))
{
- throw new ArgumentException(Strings.ArgumentMustNotBeNullOrEmpty, nameof(type));
+ throw new ArgumentException(Strings.ArgumentMustNotBeNullEmptyOrWhitespace, nameof(type));
}
- Types = new[] { type };
- IsDelete = type == "nuget:PackageDelete";
-
- if (string.IsNullOrWhiteSpace(commitId))
- {
- throw new ArgumentException(Strings.ArgumentMustNotBeNullOrEmpty, nameof(commitId));
- }
-
- CommitId = commitId;
- CommitTimeStamp = commitTs;
-
- if (string.IsNullOrWhiteSpace(id))
- {
- throw new ArgumentException(Strings.ArgumentMustNotBeNullOrEmpty, nameof(id));
- }
+ Initialize(uri, new[] { type }, commitId, commitTs, packageIdentity);
+ }
- Id = id;
- Version = version ?? throw new ArgumentNullException(nameof(version));
+ public CatalogIndexEntry(
+ Uri uri,
+ IReadOnlyList types,
+ string commitId,
+ DateTime commitTs,
+ PackageIdentity packageIdentity)
+ {
+ Initialize(uri, types, commitId, commitTs, packageIdentity);
}
[JsonProperty("@id")]
@@ -76,7 +69,13 @@ public CatalogIndexEntry(Uri uri, string type, string commitId, DateTime commitT
public NuGetVersion Version { get; private set; }
[JsonIgnore]
- public bool IsDelete { get; }
+ public bool IsDelete
+ {
+ get
+ {
+ return Types.Any(type => type == "nuget:PackageDelete");
+ }
+ }
public int CompareTo(CatalogIndexEntry other)
{
@@ -85,37 +84,60 @@ public int CompareTo(CatalogIndexEntry other)
throw new ArgumentNullException(nameof(other));
}
- return _commitTimeStampComparer.Compare(this, other);
+ return CommitTimeStamp.CompareTo(other.CommitTimeStamp);
}
- public static CatalogIndexEntry Create(JToken token)
+ public static CatalogIndexEntry Create(CatalogCommitItem commitItem)
{
- if (token == null)
+ if (commitItem == null)
{
- throw new ArgumentNullException(nameof(token));
+ throw new ArgumentNullException(nameof(commitItem));
}
- var uri = new Uri(token["@id"].ToString());
- var type = token["@type"].ToString();
- var commitId = token["commitId"].ToString();
- var commitTimeStamp = DateTime.ParseExact(
- token["commitTimeStamp"].ToString(),
- "yyyy-MM-ddTHH:mm:ss.FFFFFFFZ",
- DateTimeFormatInfo.CurrentInfo,
- DateTimeStyles.AssumeUniversal | DateTimeStyles.AdjustToUniversal);
-
- var packageId = token["nuget:id"].ToString();
- var packageVersion = NuGetVersion.Parse(token["nuget:version"].ToString());
-
- return new CatalogIndexEntry(uri, type, commitId, commitTimeStamp, packageId, packageVersion);
+ return new CatalogIndexEntry(
+ commitItem.Uri,
+ commitItem.Types,
+ commitItem.CommitId,
+ commitItem.CommitTimeStamp,
+ commitItem.PackageIdentity);
}
- }
- public class CatalogIndexEntryDateComparer : IComparer
- {
- public int Compare(CatalogIndexEntry x, CatalogIndexEntry y)
+ private void Initialize(
+ Uri uri,
+ IReadOnlyList types,
+ string commitId,
+ DateTime commitTs,
+ PackageIdentity packageIdentity)
{
- return x.CommitTimeStamp.CompareTo(y.CommitTimeStamp);
+ Uri = uri ?? throw new ArgumentNullException(nameof(uri));
+
+ if (types == null || !types.Any())
+ {
+ throw new ArgumentException(Strings.ArgumentMustNotBeNullOrEmpty, nameof(types));
+ }
+
+ if (types.Any(type => string.IsNullOrWhiteSpace(type)))
+ {
+ throw new ArgumentException(Strings.ArgumentMustNotBeNullEmptyOrWhitespace, nameof(types));
+ }
+
+ Types = types;
+
+ if (string.IsNullOrWhiteSpace(commitId))
+ {
+ throw new ArgumentException(Strings.ArgumentMustNotBeNullOrEmpty, nameof(commitId));
+ }
+
+ CommitId = commitId;
+ CommitTimeStamp = commitTs;
+
+ if (packageIdentity == null)
+ {
+ throw new ArgumentNullException(nameof(packageIdentity));
+ }
+
+ Id = packageIdentity.Id;
+ Version = packageIdentity.Version;
}
}
}
\ No newline at end of file
diff --git a/src/Catalog/CatalogIndexReader.cs b/src/Catalog/CatalogIndexReader.cs
index 0aaa60d02..e33b5a4a9 100644
--- a/src/Catalog/CatalogIndexReader.cs
+++ b/src/Catalog/CatalogIndexReader.cs
@@ -9,6 +9,7 @@
using System.Net;
using System.Threading.Tasks;
using Newtonsoft.Json.Linq;
+using NuGet.Packaging.Core;
using NuGet.Versioning;
namespace NuGet.Services.Metadata.Catalog
@@ -84,6 +85,7 @@ private async Task ProcessPageUris(ConcurrentBag pageUriBag, ConcurrentBag<
var commitId = interner.Intern(item["commitId"].ToString());
var nugetId = interner.Intern(item["nuget:id"].ToString());
var nugetVersion = interner.Intern(item["nuget:version"].ToString());
+ var packageIdentity = new PackageIdentity(nugetId, NuGetVersion.Parse(nugetVersion));
// No string is directly operated on here.
var commitTimeStamp = item["commitTimeStamp"].ToObject();
@@ -93,12 +95,11 @@ private async Task ProcessPageUris(ConcurrentBag pageUriBag, ConcurrentBag<
type,
commitId,
commitTimeStamp,
- nugetId,
- NuGetVersion.Parse(nugetVersion));
+ packageIdentity);
entries.Add(entry);
}
}
}
}
-}
+}
\ No newline at end of file
diff --git a/src/Catalog/CollectorBase.cs b/src/Catalog/CollectorBase.cs
index ea49ee7fb..952431a81 100644
--- a/src/Catalog/CollectorBase.cs
+++ b/src/Catalog/CollectorBase.cs
@@ -13,17 +13,20 @@ public abstract class CollectorBase
{
protected readonly ITelemetryService _telemetryService;
private readonly Func _handlerFunc;
+ private readonly IHttpRetryStrategy _httpRetryStrategy;
private readonly TimeSpan? _httpClientTimeout;
public CollectorBase(
Uri index,
ITelemetryService telemetryService,
Func handlerFunc = null,
- TimeSpan? httpClientTimeout = null)
+ TimeSpan? httpClientTimeout = null,
+ IHttpRetryStrategy httpRetryStrategy = null)
{
_telemetryService = telemetryService ?? throw new ArgumentNullException(nameof(telemetryService));
_handlerFunc = handlerFunc;
_httpClientTimeout = httpClientTimeout;
+ _httpRetryStrategy = httpRetryStrategy;
Index = index ?? throw new ArgumentNullException(nameof(index));
}
@@ -54,7 +57,7 @@ public async Task RunAsync(ReadWriteCursor front, ReadCursor back, Cancell
handler = _handlerFunc();
}
- using (CollectorHttpClient client = new CollectorHttpClient(handler))
+ using (CollectorHttpClient client = new CollectorHttpClient(handler, _httpRetryStrategy))
{
if (_httpClientTimeout.HasValue)
{
diff --git a/src/Catalog/CollectorHttpClient.cs b/src/Catalog/CollectorHttpClient.cs
index b820e1ef2..ead8a14e8 100644
--- a/src/Catalog/CollectorHttpClient.cs
+++ b/src/Catalog/CollectorHttpClient.cs
@@ -15,18 +15,18 @@ namespace NuGet.Services.Metadata.Catalog
public class CollectorHttpClient : HttpClient
{
private int _requestCount;
- private readonly RetryWithExponentialBackoff _retryStrategy;
+ private readonly IHttpRetryStrategy _retryStrategy;
public CollectorHttpClient()
: this(new WebRequestHandler { AllowPipelining = true })
{
}
- public CollectorHttpClient(HttpMessageHandler handler)
+ public CollectorHttpClient(HttpMessageHandler handler, IHttpRetryStrategy retryStrategy = null)
: base(handler ?? new WebRequestHandler { AllowPipelining = true })
{
_requestCount = 0;
- _retryStrategy = new RetryWithExponentialBackoff();
+ _retryStrategy = retryStrategy ?? new RetryWithExponentialBackoff();
}
public int RequestCount
diff --git a/src/Catalog/CommitCollector.cs b/src/Catalog/CommitCollector.cs
index d483d7d1f..0418e070d 100644
--- a/src/Catalog/CommitCollector.cs
+++ b/src/Catalog/CommitCollector.cs
@@ -18,8 +18,9 @@ public CommitCollector(
Uri index,
ITelemetryService telemetryService,
Func handlerFunc = null,
- TimeSpan? httpClientTimeout = null)
- : base(index, telemetryService, handlerFunc, httpClientTimeout)
+ TimeSpan? httpClientTimeout = null,
+ IHttpRetryStrategy httpRetryStrategy = null)
+ : base(index, telemetryService, handlerFunc, httpClientTimeout, httpRetryStrategy)
{
}
@@ -29,19 +30,19 @@ protected override async Task FetchAsync(
ReadCursor back,
CancellationToken cancellationToken)
{
- IEnumerable catalogItems = await FetchCatalogItemsAsync(client, front, cancellationToken);
+ IEnumerable commits = await FetchCatalogCommitsAsync(client, front, cancellationToken);
bool acceptNextBatch = false;
- foreach (CatalogItem catalogItem in catalogItems)
+ foreach (CatalogCommit commit in commits)
{
- JObject page = await client.GetJObjectAsync(catalogItem.Uri, cancellationToken);
+ JObject page = await client.GetJObjectAsync(commit.Uri, cancellationToken);
JToken context = null;
page.TryGetValue("@context", out context);
var batches = await CreateBatchesAsync(page["items"]
- .Select(item => new CatalogItem((JObject)item))
+ .Select(item => CatalogCommitItem.Create((JObject)context, (JObject)item))
.Where(item => item.CommitTimeStamp > front.Value && item.CommitTimeStamp <= back.Value));
var orderedBatches = batches
@@ -70,7 +71,7 @@ protected override async Task FetchAsync(
{
acceptNextBatch = await OnProcessBatchAsync(
client,
- batch.Items.Select(item => item.Value),
+ batch.Items,
context,
batch.CommitTimeStamp,
batch.CommitTimeStamp == lastBatch.CommitTimeStamp,
@@ -104,7 +105,7 @@ protected override async Task FetchAsync(
return acceptNextBatch;
}
- protected async Task> FetchCatalogItemsAsync(
+ protected async Task> FetchCatalogCommitsAsync(
CollectorHttpClient client,
ReadWriteCursor front,
CancellationToken cancellationToken)
@@ -118,67 +119,30 @@ protected async Task> FetchCatalogItemsAsync(
root = await client.GetJObjectAsync(Index, cancellationToken);
}
- IEnumerable rootItems = root["items"]
- .Select(item => new CatalogItem((JObject)item))
+ IEnumerable commits = root["items"]
+ .Select(item => CatalogCommit.Create((JObject)item))
.Where(item => item.CommitTimeStamp > front.Value)
.OrderBy(item => item.CommitTimeStamp);
- return rootItems;
+ return commits;
}
- protected virtual Task> CreateBatchesAsync(IEnumerable catalogItems)
+ protected virtual Task> CreateBatchesAsync(IEnumerable catalogItems)
{
var batches = catalogItems
.GroupBy(item => item.CommitTimeStamp)
.OrderBy(group => group.Key)
- .Select(group => new CatalogItemBatch(group.Key, group));
+ .Select(group => new CatalogCommitItemBatch(group));
return Task.FromResult(batches);
}
protected abstract Task OnProcessBatchAsync(
CollectorHttpClient client,
- IEnumerable items,
+ IEnumerable items,
JToken context,
DateTime commitTimeStamp,
bool isLastBatch,
CancellationToken cancellationToken);
-
- protected class CatalogItemBatch : IComparable
- {
- public CatalogItemBatch(DateTime commitTimeStamp, IEnumerable items)
- {
- CommitTimeStamp = commitTimeStamp;
- Items = items.ToList();
- Items.Sort();
- }
-
- public DateTime CommitTimeStamp { get; }
- public List Items { get; }
-
- public int CompareTo(object obj)
- {
- return CommitTimeStamp.CompareTo(((CatalogItem)obj).CommitTimeStamp);
- }
- }
-
- protected class CatalogItem : IComparable
- {
- public CatalogItem(JObject value)
- {
- CommitTimeStamp = value["commitTimeStamp"].ToObject();
- Uri = value["@id"].ToObject();
- Value = value;
- }
-
- public DateTime CommitTimeStamp { get; }
- public Uri Uri { get; }
- public JObject Value { get; }
-
- public int CompareTo(object obj)
- {
- return CommitTimeStamp.CompareTo(((CatalogItem)obj).CommitTimeStamp);
- }
- }
}
}
\ No newline at end of file
diff --git a/src/Catalog/CreateCommitItemBatchesAsync.cs b/src/Catalog/CreateCommitItemBatchesAsync.cs
new file mode 100644
index 000000000..d95efee77
--- /dev/null
+++ b/src/Catalog/CreateCommitItemBatchesAsync.cs
@@ -0,0 +1,11 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+using System.Collections.Generic;
+using System.Threading.Tasks;
+
+namespace NuGet.Services.Metadata.Catalog
+{
+ internal delegate Task> CreateCommitItemBatchesAsync(
+ IEnumerable catalogItems);
+}
\ No newline at end of file
diff --git a/src/Catalog/Dnx/DnxCatalogCollector.cs b/src/Catalog/Dnx/DnxCatalogCollector.cs
index 3af28b824..c3448287d 100644
--- a/src/Catalog/Dnx/DnxCatalogCollector.cs
+++ b/src/Catalog/Dnx/DnxCatalogCollector.cs
@@ -7,6 +7,7 @@
using System.IO;
using System.IO.Compression;
using System.Linq;
+using System.Net;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
@@ -24,7 +25,8 @@ public class DnxCatalogCollector : CommitCollector
private readonly IAzureStorage _sourceStorage;
private readonly DnxMaker _dnxMaker;
private readonly ILogger _logger;
- private readonly int _maxDegreeOfParallelism;
+ private readonly int _maxConcurrentBatches;
+ private readonly int _maxConcurrentCommitItemsWithinBatch;
private readonly Uri _contentBaseAddress;
public DnxCatalogCollector(
@@ -52,26 +54,57 @@ public DnxCatalogCollector(
string.Format(Strings.ArgumentOutOfRange, 1, int.MaxValue));
}
- _maxDegreeOfParallelism = maxDegreeOfParallelism;
+ // Find two factors which are close or equal to each other.
+ var squareRoot = Math.Sqrt(maxDegreeOfParallelism);
+
+ // If the max degree of parallelism is a perfect square, great.
+ // Otherwise, prefer a greater degree of parallelism in batches than commit items within a batch.
+ _maxConcurrentBatches = Convert.ToInt32(Math.Ceiling(squareRoot));
+ _maxConcurrentCommitItemsWithinBatch = Convert.ToInt32(maxDegreeOfParallelism / _maxConcurrentBatches);
+
+ ServicePointManager.DefaultConnectionLimit = _maxConcurrentBatches * _maxConcurrentCommitItemsWithinBatch;
+ }
+
+ protected override Task> CreateBatchesAsync(
+ IEnumerable catalogItems)
+ {
+ var batches = CatalogCommitUtilities.CreateCommitItemBatches(
+ catalogItems,
+ CatalogCommitUtilities.GetPackageIdKey);
+
+ return Task.FromResult(batches);
+ }
+
+ protected override Task FetchAsync(
+ CollectorHttpClient client,
+ ReadWriteCursor front,
+ ReadCursor back,
+ CancellationToken cancellationToken)
+ {
+ return CatalogCommitUtilities.ProcessCatalogCommitsAsync(
+ client,
+ front,
+ back,
+ FetchCatalogCommitsAsync,
+ CreateBatchesAsync,
+ ProcessBatchAsync,
+ _maxConcurrentBatches,
+ _logger,
+ cancellationToken);
}
protected override async Task OnProcessBatchAsync(
CollectorHttpClient client,
- IEnumerable items,
+ IEnumerable items,
JToken context,
DateTime commitTimeStamp,
bool isLastBatch,
CancellationToken cancellationToken)
{
- var catalogEntries = items.Select(
- item => new CatalogEntry(
- item["nuget:id"].ToString().ToLowerInvariant(),
- NuGetVersionUtility.NormalizeVersion(item["nuget:version"].ToString()).ToLowerInvariant(),
- item["@type"].ToString().Replace("nuget:", Schema.Prefixes.NuGet),
- item))
+ var catalogEntries = items.Select(item => CatalogEntry.Create(item))
.ToList();
- // Sanity check: a single catalog batch should not contain multiple entries for the same package ID and version.
+ // Sanity check: a single catalog batch should not contain multiple entries for the same package identity.
AssertNoMultipleEntriesForSamePackageIdentity(commitTimeStamp, catalogEntries);
// Process .nupkg/.nuspec adds and deletes.
@@ -90,16 +123,16 @@ private async Task> ProcessCatalogEntriesAsync(
{
var processedCatalogEntries = new ConcurrentBag();
- await catalogEntries.ForEachAsync(_maxDegreeOfParallelism, async catalogEntry =>
+ await catalogEntries.ForEachAsync(_maxConcurrentCommitItemsWithinBatch, async catalogEntry =>
{
var packageId = catalogEntry.PackageId;
var normalizedPackageVersion = catalogEntry.NormalizedPackageVersion;
- if (catalogEntry.EntryType == Schema.DataTypes.PackageDetails.ToString())
+ if (catalogEntry.Type.AbsoluteUri == Schema.DataTypes.PackageDetails.AbsoluteUri)
{
- var properties = GetTelemetryProperties(catalogEntry);
+ var telemetryProperties = GetTelemetryProperties(catalogEntry);
- using (_telemetryService.TrackDuration(TelemetryConstants.ProcessPackageDetailsSeconds, properties))
+ using (_telemetryService.TrackDuration(TelemetryConstants.ProcessPackageDetailsSeconds, telemetryProperties))
{
var packageFileName = PackageUtility.GetPackageFileName(
packageId,
@@ -132,13 +165,14 @@ await catalogEntries.ForEachAsync(_maxDegreeOfParallelism, async catalogEntry =>
packageId,
normalizedPackageVersion,
sourceUri,
+ telemetryProperties,
cancellationToken))
{
processedCatalogEntries.Add(catalogEntry);
}
}
}
- else if (catalogEntry.EntryType == Schema.DataTypes.PackageDelete.ToString())
+ else if (catalogEntry.Type.AbsoluteUri == Schema.DataTypes.PackageDelete.AbsoluteUri)
{
var properties = GetTelemetryProperties(catalogEntry);
@@ -175,7 +209,7 @@ private async Task UpdatePackageVersionIndexAsync(
{
var catalogEntryGroups = catalogEntries.GroupBy(catalogEntry => catalogEntry.PackageId);
- await catalogEntryGroups.ForEachAsync(_maxDegreeOfParallelism, async catalogEntryGroup =>
+ await catalogEntryGroups.ForEachAsync(_maxConcurrentCommitItemsWithinBatch, async catalogEntryGroup =>
{
var packageId = catalogEntryGroup.Key;
var properties = new Dictionary()
@@ -190,11 +224,11 @@ await _dnxMaker.UpdatePackageVersionIndexAsync(packageId, versions =>
{
foreach (var catalogEntry in catalogEntryGroup)
{
- if (catalogEntry.EntryType == Schema.DataTypes.PackageDetails.ToString())
+ if (catalogEntry.Type.AbsoluteUri == Schema.DataTypes.PackageDetails.AbsoluteUri)
{
versions.Add(NuGetVersion.Parse(catalogEntry.NormalizedPackageVersion));
}
- else if (catalogEntry.EntryType == Schema.DataTypes.PackageDelete.ToString())
+ else if (catalogEntry.Type.AbsoluteUri == Schema.DataTypes.PackageDelete.AbsoluteUri)
{
versions.Remove(NuGetVersion.Parse(catalogEntry.NormalizedPackageVersion));
}
@@ -214,11 +248,13 @@ private async Task ProcessPackageDetailsAsync(
string packageId,
string normalizedPackageVersion,
Uri sourceUri,
+ Dictionary telemetryProperties,
CancellationToken cancellationToken)
{
if (await ProcessPackageDetailsViaStorageAsync(
packageId,
normalizedPackageVersion,
+ telemetryProperties,
cancellationToken))
{
return true;
@@ -234,12 +270,14 @@ private async Task ProcessPackageDetailsAsync(
packageId,
normalizedPackageVersion,
sourceUri,
+ telemetryProperties,
cancellationToken);
}
private async Task ProcessPackageDetailsViaStorageAsync(
string packageId,
string normalizedPackageVersion,
+ Dictionary telemetryProperties,
CancellationToken cancellationToken)
{
if (_sourceStorage == null)
@@ -260,6 +298,8 @@ private async Task ProcessPackageDetailsViaStorageAsync(
// If the ETag's differ, we'll fall back to using a single HTTP GET request.
var token1 = await _sourceStorage.GetOptimisticConcurrencyControlTokenAsync(sourceUri, cancellationToken);
+ telemetryProperties[TelemetryConstants.SizeInBytes] = sourceBlob.Length.ToString();
+
var nuspec = await GetNuspecAsync(sourceBlob, packageId, cancellationToken);
if (string.IsNullOrEmpty(nuspec))
@@ -311,6 +351,7 @@ private async Task ProcessPackageDetailsViaHttpAsync(
string id,
string version,
Uri sourceUri,
+ Dictionary telemetryProperties,
CancellationToken cancellationToken)
{
var packageDownloader = new PackageDownloader(client, _logger);
@@ -325,6 +366,8 @@ private async Task ProcessPackageDetailsViaHttpAsync(
return false;
}
+ telemetryProperties[TelemetryConstants.SizeInBytes] = stream.Length.ToString();
+
var nuspec = GetNuspec(stream, id);
if (nuspec == null)
@@ -385,7 +428,7 @@ private static void AssertNoMultipleEntriesForSamePackageIdentity(
}
}
- private static async Task GetNuspecAsync(
+ private async Task GetNuspecAsync(
ICloudBlockBlob sourceBlob,
string packageId,
CancellationToken cancellationToken)
@@ -443,19 +486,60 @@ private static Dictionary GetTelemetryProperties(string packageI
};
}
+ private async Task ProcessBatchAsync(
+ CollectorHttpClient client,
+ JToken context,
+ string packageId,
+ CatalogCommitItemBatch batch,
+ CatalogCommitItemBatch lastBatch,
+ CancellationToken cancellationToken)
+ {
+ await Task.Yield();
+
+ using (_telemetryService.TrackDuration(
+ TelemetryConstants.ProcessBatchSeconds,
+ new Dictionary()
+ {
+ { TelemetryConstants.Id, packageId },
+ { TelemetryConstants.BatchItemCount, batch.Items.Count.ToString() }
+ }))
+ {
+ await OnProcessBatchAsync(
+ client,
+ batch.Items,
+ context,
+ batch.CommitTimeStamp,
+ isLastBatch: false,
+ cancellationToken: cancellationToken);
+ }
+ }
+
private sealed class CatalogEntry
{
+ internal DateTime CommitTimeStamp { get; }
internal string PackageId { get; }
internal string NormalizedPackageVersion { get; }
- internal string EntryType { get; }
- internal JToken Entry { get; }
+ internal Uri Type { get; }
- internal CatalogEntry(string packageId, string normalizedPackageVersion, string entryType, JToken entry)
+ private CatalogEntry(DateTime commitTimeStamp, string packageId, string normalizedPackageVersion, Uri type)
{
+ CommitTimeStamp = commitTimeStamp;
PackageId = packageId;
NormalizedPackageVersion = normalizedPackageVersion;
- EntryType = entryType;
- Entry = entry;
+ Type = type;
+ }
+
+ internal static CatalogEntry Create(CatalogCommitItem item)
+ {
+ var typeUri = item.TypeUris.Single(uri =>
+ uri.AbsoluteUri == Schema.DataTypes.PackageDetails.AbsoluteUri ||
+ uri.AbsoluteUri == Schema.DataTypes.PackageDelete.AbsoluteUri);
+
+ return new CatalogEntry(
+ item.CommitTimeStamp,
+ item.PackageIdentity.Id.ToLowerInvariant(),
+ item.PackageIdentity.Version.ToNormalizedString().ToLowerInvariant(),
+ typeUri);
}
}
}
diff --git a/src/Catalog/Dnx/DnxMaker.cs b/src/Catalog/Dnx/DnxMaker.cs
index 478d7a509..732b19e71 100644
--- a/src/Catalog/Dnx/DnxMaker.cs
+++ b/src/Catalog/Dnx/DnxMaker.cs
@@ -173,7 +173,7 @@ public async Task UpdatePackageVersionIndexAsync(string id, Action result = new List(versions);
+ var result = new List(versions);
if (result.Any())
{
diff --git a/src/Catalog/FetchCatalogCommitsAsync.cs b/src/Catalog/FetchCatalogCommitsAsync.cs
new file mode 100644
index 000000000..cde21651b
--- /dev/null
+++ b/src/Catalog/FetchCatalogCommitsAsync.cs
@@ -0,0 +1,14 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace NuGet.Services.Metadata.Catalog
+{
+ internal delegate Task> FetchCatalogCommitsAsync(
+ CollectorHttpClient client,
+ ReadWriteCursor front,
+ CancellationToken cancellationToken);
+}
\ No newline at end of file
diff --git a/src/Catalog/GetCatalogCommitItemKey.cs b/src/Catalog/GetCatalogCommitItemKey.cs
new file mode 100644
index 000000000..d28bf0659
--- /dev/null
+++ b/src/Catalog/GetCatalogCommitItemKey.cs
@@ -0,0 +1,7 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+namespace NuGet.Services.Metadata.Catalog
+{
+ public delegate string GetCatalogCommitItemKey(CatalogCommitItem item);
+}
\ No newline at end of file
diff --git a/src/Catalog/Helpers/CatalogProperties.cs b/src/Catalog/Helpers/CatalogProperties.cs
index 9a9fecafc..acef7ed86 100644
--- a/src/Catalog/Helpers/CatalogProperties.cs
+++ b/src/Catalog/Helpers/CatalogProperties.cs
@@ -2,6 +2,11 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
+using System.Diagnostics;
+using System.Threading;
+using System.Threading.Tasks;
+using Newtonsoft.Json.Linq;
+using NuGet.Services.Metadata.Catalog.Persistence;
namespace NuGet.Services.Metadata.Catalog.Helpers
{
@@ -17,5 +22,68 @@ public CatalogProperties(DateTime? lastCreated, DateTime? lastDeleted, DateTime?
LastDeleted = lastDeleted;
LastEdited = lastEdited;
}
+
+ ///
+ /// Asynchronously reads and returns top-level metadata from the catalog's index.json.
+ ///
+ /// The metadata values include "nuget:lastCreated", "nuget:lastDeleted", and "nuget:lastEdited",
+ /// which are the timestamps of the catalog cursor.
+ ///
+ ///
+ ///
+ /// A task that represents the asynchronous operation.
+ /// The task result () returns a .
+ /// Thrown if is null.
+ /// Thrown if
+ /// is cancelled.
+ public static async Task ReadAsync(
+ IStorage storage,
+ ITelemetryService telemetryService,
+ CancellationToken cancellationToken)
+ {
+ if (storage == null)
+ {
+ throw new ArgumentNullException(nameof(storage));
+ }
+
+ if (telemetryService == null)
+ {
+ throw new ArgumentNullException(nameof(telemetryService));
+ }
+
+ cancellationToken.ThrowIfCancellationRequested();
+
+ DateTime? lastCreated = null;
+ DateTime? lastDeleted = null;
+ DateTime? lastEdited = null;
+
+ var stopwatch = Stopwatch.StartNew();
+ var indexUri = storage.ResolveUri("index.json");
+ var json = await storage.LoadStringAsync(indexUri, cancellationToken);
+
+ if (json != null)
+ {
+ var obj = JObject.Parse(json);
+ telemetryService.TrackCatalogIndexReadDuration(stopwatch.Elapsed, indexUri);
+ JToken token;
+
+ if (obj.TryGetValue("nuget:lastCreated", out token))
+ {
+ lastCreated = token.ToObject().ToUniversalTime();
+ }
+
+ if (obj.TryGetValue("nuget:lastDeleted", out token))
+ {
+ lastDeleted = token.ToObject().ToUniversalTime();
+ }
+
+ if (obj.TryGetValue("nuget:lastEdited", out token))
+ {
+ lastEdited = token.ToObject().ToUniversalTime();
+ }
+ }
+
+ return new CatalogProperties(lastCreated, lastDeleted, lastEdited);
+ }
}
}
\ No newline at end of file
diff --git a/src/Catalog/Helpers/FeedHelpers.cs b/src/Catalog/Helpers/FeedHelpers.cs
index f8d67fb97..e4162cbea 100644
--- a/src/Catalog/Helpers/FeedHelpers.cs
+++ b/src/Catalog/Helpers/FeedHelpers.cs
@@ -3,7 +3,6 @@
using System;
using System.Collections.Generic;
-using System.Diagnostics;
using System.Linq;
using System.Net.Http;
using System.Threading;
@@ -11,7 +10,6 @@
using System.Web;
using System.Xml.Linq;
using Microsoft.Extensions.Logging;
-using Newtonsoft.Json.Linq;
using NuGet.Services.Metadata.Catalog.Persistence;
namespace NuGet.Services.Metadata.Catalog.Helpers
@@ -30,69 +28,6 @@ public static HttpClient CreateHttpClient(Func handlerFunc)
return new HttpClient(handler);
}
- ///
- /// Asynchronously reads and returns top-level metadata from the catalog's index.json.
- ///
- /// The metadata values include "nuget:lastCreated", "nuget:lastDeleted", and "nuget:lastEdited",
- /// which are the timestamps of the catalog cursor.
- ///
- ///
- ///
- /// A task that represents the asynchronous operation.
- /// The task result () returns a .
- /// Thrown if is null.
- /// Thrown if
- /// is cancelled.
- public static async Task GetCatalogPropertiesAsync(
- IStorage storage,
- ITelemetryService telemetryService,
- CancellationToken cancellationToken)
- {
- if (storage == null)
- {
- throw new ArgumentNullException(nameof(storage));
- }
-
- if (telemetryService == null)
- {
- throw new ArgumentNullException(nameof(telemetryService));
- }
-
- cancellationToken.ThrowIfCancellationRequested();
-
- DateTime? lastCreated = null;
- DateTime? lastDeleted = null;
- DateTime? lastEdited = null;
-
- var stopwatch = Stopwatch.StartNew();
- var indexUri = storage.ResolveUri("index.json");
- var json = await storage.LoadStringAsync(indexUri, cancellationToken);
-
- if (json != null)
- {
- var obj = JObject.Parse(json);
- telemetryService.TrackCatalogIndexReadDuration(stopwatch.Elapsed, indexUri);
- JToken token;
-
- if (obj.TryGetValue("nuget:lastCreated", out token))
- {
- lastCreated = token.ToObject().ToUniversalTime();
- }
-
- if (obj.TryGetValue("nuget:lastDeleted", out token))
- {
- lastDeleted = token.ToObject().ToUniversalTime();
- }
-
- if (obj.TryGetValue("nuget:lastEdited", out token))
- {
- lastEdited = token.ToObject().ToUniversalTime();
- }
- }
-
- return new CatalogProperties(lastCreated, lastDeleted, lastEdited);
- }
-
///
/// Builds a for accessing the metadata of a specific package on the feed.
///
diff --git a/src/Catalog/Helpers/Utils.cs b/src/Catalog/Helpers/Utils.cs
index 795cd4941..3bbcdb01d 100644
--- a/src/Catalog/Helpers/Utils.cs
+++ b/src/Catalog/Helpers/Utils.cs
@@ -6,6 +6,7 @@
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Diagnostics;
+using System.Globalization;
using System.IO;
using System.IO.Compression;
using System.Linq;
@@ -346,21 +347,13 @@ public static bool IsType(JToken context, JToken obj, Uri type)
return false;
}
- public static bool IsType(JToken context, JObject obj, Uri[] types)
+ public static Uri Expand(JToken context, JToken token)
{
- foreach (Uri type in types)
- {
- if (IsType(context, obj, type))
- {
- return true;
- }
- }
- return false;
+ return Expand(context, token.ToString());
}
- public static Uri Expand(JToken context, JToken token)
+ public static Uri Expand(JToken context, string term)
{
- string term = token.ToString();
if (term.StartsWith("http:", StringComparison.OrdinalIgnoreCase))
{
return new Uri(term);
@@ -563,5 +556,25 @@ public static void TraceException(Exception e)
}
}
}
+
+ internal static T Deserialize(JObject jObject, string propertyName)
+ {
+ if (jObject == null)
+ {
+ throw new ArgumentNullException(nameof(jObject));
+ }
+
+ if (string.IsNullOrEmpty(propertyName))
+ {
+ throw new ArgumentException(Strings.ArgumentMustNotBeNullOrEmpty, nameof(propertyName));
+ }
+
+ if (!jObject.TryGetValue(propertyName, out var value) || value == null)
+ {
+ throw new ArgumentException(string.Format(CultureInfo.InvariantCulture, Strings.PropertyRequired, propertyName));
+ }
+
+ return value.ToObject();
+ }
}
}
\ No newline at end of file
diff --git a/src/Catalog/IHttpRetryStrategy.cs b/src/Catalog/IHttpRetryStrategy.cs
new file mode 100644
index 000000000..ea03c3858
--- /dev/null
+++ b/src/Catalog/IHttpRetryStrategy.cs
@@ -0,0 +1,15 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+using System;
+using System.Net.Http;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace NuGet.Services.Metadata.Catalog
+{
+ public interface IHttpRetryStrategy
+ {
+ Task SendAsync(HttpClient client, Uri address, CancellationToken cancellationToken);
+ }
+}
\ No newline at end of file
diff --git a/src/Catalog/NuGet.Services.Metadata.Catalog.csproj b/src/Catalog/NuGet.Services.Metadata.Catalog.csproj
index c346b91b9..1da06185f 100644
--- a/src/Catalog/NuGet.Services.Metadata.Catalog.csproj
+++ b/src/Catalog/NuGet.Services.Metadata.Catalog.csproj
@@ -82,21 +82,30 @@
+
+
+
+
+
+
+
+
+
@@ -114,6 +123,7 @@
+
diff --git a/src/Catalog/Persistence/AzureCloudBlockBlob.cs b/src/Catalog/Persistence/AzureCloudBlockBlob.cs
index 0071e29ad..4e0a431df 100644
--- a/src/Catalog/Persistence/AzureCloudBlockBlob.cs
+++ b/src/Catalog/Persistence/AzureCloudBlockBlob.cs
@@ -26,6 +26,7 @@ public string ContentMD5
}
public string ETag => _blob.Properties.ETag;
+ public long Length => _blob.Properties.Length;
public Uri Uri => _blob.Uri;
public AzureCloudBlockBlob(CloudBlockBlob blob)
diff --git a/src/Catalog/Persistence/AzureStorage.cs b/src/Catalog/Persistence/AzureStorage.cs
index 714b8705d..444143df4 100644
--- a/src/Catalog/Persistence/AzureStorage.cs
+++ b/src/Catalog/Persistence/AzureStorage.cs
@@ -20,7 +20,6 @@ public class AzureStorage : Storage, IAzureStorage
{
private readonly bool _compressContent;
private readonly CloudBlobDirectory _directory;
- private readonly BlobRequestOptions _blobRequestOptions;
private readonly bool _useServerSideCopy;
public static readonly TimeSpan DefaultServerTimeout = TimeSpan.FromSeconds(30);
@@ -72,22 +71,24 @@ private AzureStorage(CloudBlobDirectory directory, Uri baseAddress, TimeSpan max
{
_directory = directory;
+ // Unless overridden at the level of a single API call, these options will apply to all service calls that
+ // use BlobRequestOptions.
+ _directory.ServiceClient.DefaultRequestOptions = new BlobRequestOptions()
+ {
+ ServerTimeout = serverTimeout,
+ MaximumExecutionTime = maxExecutionTime,
+ RetryPolicy = new ExponentialRetry()
+ };
+
if (_directory.Container.CreateIfNotExists())
{
_directory.Container.SetPermissions(new BlobContainerPermissions { PublicAccess = BlobContainerPublicAccessType.Blob });
if (Verbose)
{
- Trace.WriteLine(String.Format("Created '{0}' publish container", _directory.Container.Name));
+ Trace.WriteLine(string.Format("Created '{0}' publish container", _directory.Container.Name));
}
}
-
- _blobRequestOptions = new BlobRequestOptions()
- {
- ServerTimeout = serverTimeout,
- MaximumExecutionTime = maxExecutionTime,
- RetryPolicy = new ExponentialRetry()
- };
}
public override async Task GetOptimisticConcurrencyControlTokenAsync(
@@ -134,7 +135,7 @@ public override bool Exists(string fileName)
}
if (Verbose)
{
- Trace.WriteLine(String.Format("The blob {0} does not exist.", packageRegistrationUri));
+ Trace.WriteLine(string.Format("The blob {0} does not exist.", packageRegistrationUri));
}
return false;
}
@@ -230,11 +231,7 @@ protected override async Task OnSaveAsync(Uri resourceUri, StorageContent conten
destinationStream.Seek(0, SeekOrigin.Begin);
- await blob.UploadFromStreamAsync(destinationStream,
- accessCondition: null,
- options: _blobRequestOptions,
- operationContext: null,
- cancellationToken: cancellationToken);
+ await blob.UploadFromStreamAsync(destinationStream, cancellationToken);
Trace.WriteLine(string.Format("Saved compressed blob {0} to container {1}", blob.Uri.ToString(), _directory.Container.Name));
}
@@ -243,11 +240,7 @@ await blob.UploadFromStreamAsync(destinationStream,
{
using (Stream stream = content.GetContentStream())
{
- await blob.UploadFromStreamAsync(stream,
- accessCondition: null,
- options: _blobRequestOptions,
- operationContext: null,
- cancellationToken: cancellationToken);
+ await blob.UploadFromStreamAsync(stream, cancellationToken);
}
Trace.WriteLine(string.Format("Saved uncompressed blob {0} to container {1}", blob.Uri.ToString(), _directory.Container.Name));
@@ -306,32 +299,30 @@ protected override async Task OnLoadAsync(Uri resourceUri, Cance
if (blob.Exists())
{
- MemoryStream originalStream = new MemoryStream();
- await blob.DownloadToStreamAsync(originalStream,
- accessCondition: null,
- options: _blobRequestOptions,
- operationContext: null,
- cancellationToken: cancellationToken);
-
- originalStream.Seek(0, SeekOrigin.Begin);
-
string content;
- if (blob.Properties.ContentEncoding == "gzip")
+ using (var originalStream = new MemoryStream())
{
- using (var uncompressedStream = new GZipStream(originalStream, CompressionMode.Decompress))
+ await blob.DownloadToStreamAsync(originalStream, cancellationToken);
+
+ originalStream.Seek(0, SeekOrigin.Begin);
+
+ if (blob.Properties.ContentEncoding == "gzip")
{
- using (var reader = new StreamReader(uncompressedStream))
+ using (var uncompressedStream = new GZipStream(originalStream, CompressionMode.Decompress))
{
- content = await reader.ReadToEndAsync();
+ using (var reader = new StreamReader(uncompressedStream))
+ {
+ content = await reader.ReadToEndAsync();
+ }
}
}
- }
- else
- {
- using (var reader = new StreamReader(originalStream))
+ else
{
- content = await reader.ReadToEndAsync();
+ using (var reader = new StreamReader(originalStream))
+ {
+ content = await reader.ReadToEndAsync();
+ }
}
}
@@ -340,7 +331,7 @@ await blob.DownloadToStreamAsync(originalStream,
if (Verbose)
{
- Trace.WriteLine(String.Format("Can't load '{0}'. Blob doesn't exist", resourceUri));
+ Trace.WriteLine(string.Format("Can't load '{0}'. Blob doesn't exist", resourceUri));
}
return null;
@@ -353,7 +344,7 @@ protected override async Task OnDeleteAsync(Uri resourceUri, CancellationToken c
CloudBlockBlob blob = _directory.GetBlockBlobReference(name);
await blob.DeleteAsync(deleteSnapshotsOption: DeleteSnapshotsOption.IncludeSnapshots,
accessCondition: null,
- options: _blobRequestOptions,
+ options: null,
operationContext: null,
cancellationToken: cancellationToken);
}
diff --git a/src/Catalog/Persistence/ICloudBlockBlob.cs b/src/Catalog/Persistence/ICloudBlockBlob.cs
index 1ec994b14..1e17a7b0d 100644
--- a/src/Catalog/Persistence/ICloudBlockBlob.cs
+++ b/src/Catalog/Persistence/ICloudBlockBlob.cs
@@ -15,6 +15,7 @@ public interface ICloudBlockBlob
{
string ContentMD5 { get; set; }
string ETag { get; }
+ long Length { get; }
Uri Uri { get; }
Task ExistsAsync(CancellationToken cancellationToken);
diff --git a/src/Catalog/ProcessCommitItemBatchAsync.cs b/src/Catalog/ProcessCommitItemBatchAsync.cs
new file mode 100644
index 000000000..71664dc0e
--- /dev/null
+++ b/src/Catalog/ProcessCommitItemBatchAsync.cs
@@ -0,0 +1,17 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+using System.Threading;
+using System.Threading.Tasks;
+using Newtonsoft.Json.Linq;
+
+namespace NuGet.Services.Metadata.Catalog
+{
+ public delegate Task ProcessCommitItemBatchAsync(
+ CollectorHttpClient client,
+ JToken context,
+ string packageId,
+ CatalogCommitItemBatch batch,
+ CatalogCommitItemBatch lastBatch,
+ CancellationToken cancellationToken);
+}
\ No newline at end of file
diff --git a/src/Catalog/Registration/RegistrationCollector.cs b/src/Catalog/Registration/RegistrationCollector.cs
index 7a318af27..983edc8c7 100644
--- a/src/Catalog/Registration/RegistrationCollector.cs
+++ b/src/Catalog/Registration/RegistrationCollector.cs
@@ -3,11 +3,10 @@
using System;
using System.Collections.Generic;
-using System.Diagnostics;
-using System.Linq;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
using NuGet.Services.Metadata.Catalog.Helpers;
using NuGet.Services.Metadata.Catalog.Persistence;
@@ -27,9 +26,7 @@ public class RegistrationCollector : SortingGraphCollector
private readonly StorageFactory _semVer2StorageFactory;
private readonly ShouldIncludeRegistrationPackage _shouldIncludeSemVer2;
private readonly int _maxConcurrentBatches;
-
- // Doesn't exist until .NET 4.6
- private static readonly Task CompletedTask = Task.FromResult(0);
+ private readonly ILogger _logger;
public RegistrationCollector(
Uri index,
@@ -37,11 +34,19 @@ public RegistrationCollector(
StorageFactory semVer2StorageFactory,
Uri contentBaseAddress,
ITelemetryService telemetryService,
+ ILogger logger,
Func handlerFunc = null,
+ IHttpRetryStrategy httpRetryStrategy = null,
int maxConcurrentBatches = DefaultMaxConcurrentBatches)
- : base(index, new Uri[] { Schema.DataTypes.PackageDetails, Schema.DataTypes.PackageDelete }, telemetryService, handlerFunc)
+ : base(
+ index,
+ new Uri[] { Schema.DataTypes.PackageDetails, Schema.DataTypes.PackageDelete },
+ telemetryService,
+ handlerFunc,
+ httpRetryStrategy)
{
_legacyStorageFactory = legacyStorageFactory ?? throw new ArgumentNullException(nameof(legacyStorageFactory));
+ _logger = logger ?? throw new ArgumentNullException(nameof(logger));
_semVer2StorageFactory = semVer2StorageFactory;
_shouldIncludeSemVer2 = GetShouldIncludeRegistrationPackage(_semVer2StorageFactory);
ContentBaseAddress = contentBaseAddress;
@@ -59,7 +64,8 @@ public RegistrationCollector(
public Uri ContentBaseAddress { get; }
- protected override Task> CreateBatchesAsync(IEnumerable catalogItems)
+ protected override Task> CreateBatchesAsync(
+ IEnumerable catalogItems)
{
// Grouping batches by commit is slow if it contains
// the same package registration id over and over again.
@@ -80,31 +86,14 @@ protected override Task> CreateBatchesAsync(IEnume
// from the correct location (even though we may have
// a little rework).
- var batches = catalogItems
- .GroupBy(item => GetKey(item.Value))
- .Select(group => new CatalogItemBatch(
- group.Min(item => item.CommitTimeStamp),
- group));
+ var batches = CatalogCommitUtilities.CreateCommitItemBatches(catalogItems, GetKey);
return Task.FromResult(batches);
}
- private async Task CreateBatchesAsync(ReadWriteCursor front, ReadCursor back, JObject page)
+ protected override string GetKey(CatalogCommitItem item)
{
- IEnumerable pageItems = page["items"]
- .Select(item => new CatalogItem((JObject)item))
- .Where(item => item.CommitTimeStamp > front.Value && item.CommitTimeStamp <= back.Value);
-
- IEnumerable batches = await CreateBatchesAsync(pageItems);
-
- return batches
- .OrderBy(batch => batch.CommitTimeStamp)
- .ToArray();
- }
-
- protected override string GetKey(JObject item)
- {
- return item["nuget:id"].ToString().ToLowerInvariant();
+ return CatalogCommitUtilities.GetPackageIdKey(item);
}
// Summary:
@@ -120,111 +109,22 @@ protected override string GetKey(JObject item)
// been transactional. Actively cancelling tasks would make an inconsistent registration more likely.
// 6. Update the cursor if and only if all preceding commits and the current (oldest) commit have been
// fully and successfully processed.
- protected override async Task FetchAsync(
+ protected override Task FetchAsync(
CollectorHttpClient client,
ReadWriteCursor front,
ReadCursor back,
CancellationToken cancellationToken)
{
- IEnumerable catalogItems = await FetchCatalogItemsAsync(client, front, cancellationToken);
-
- var hasAnyBatchFailed = false;
- var hasAnyBatchBeenProcessed = false;
-
- foreach (CatalogItem catalogItem in catalogItems)
- {
- JObject page = await client.GetJObjectAsync(catalogItem.Uri, cancellationToken);
- JToken context = page["@context"];
- CatalogItemBatch[] batches = await CreateBatchesAsync(front, back, page);
- SortedDictionary commitBatchTasksMap = CreateCommitBatchTasksMap(batches);
-
- var unprocessedBatches = new Queue(batches);
- var processingBatches = new Queue();
-
- CatalogItemBatch lastBatch = unprocessedBatches.LastOrDefault();
- var exceptions = new List();
-
- EnqueueBatchesIfNoFailures(
- client,
- context,
- commitBatchTasksMap,
- unprocessedBatches,
- processingBatches,
- lastBatch,
- cancellationToken);
-
- while (processingBatches.Any())
- {
- var activeTasks = processingBatches.Where(batch => !batch.Task.IsCompleted)
- .Select(batch => batch.Task)
- .DefaultIfEmpty(CompletedTask);
-
- await Task.WhenAny(activeTasks);
-
- while (!hasAnyBatchFailed && commitBatchTasksMap.Any())
- {
- var commitBatchTasks = commitBatchTasksMap.First().Value;
- var isCommitFullyProcessed = commitBatchTasks.BatchTasks.All(batch => batch.Task != null && batch.Task.IsCompleted);
-
- if (!isCommitFullyProcessed)
- {
- break;
- }
-
- var isCommitSuccessfullyProcessed = commitBatchTasks.BatchTasks.All(batch => batch.Task.Status == TaskStatus.RanToCompletion);
-
- if (isCommitSuccessfullyProcessed)
- {
- var commitTimeStamp = commitBatchTasks.CommitTimeStamp;
-
- front.Value = commitTimeStamp;
-
- await front.SaveAsync(cancellationToken);
-
- Trace.TraceInformation($"{nameof(RegistrationCollector)}.{nameof(FetchAsync)} {nameof(front)}.{nameof(front.Value)} saved since timestamp changed from previous: {{0}}", front);
-
- DequeueBatchesWhileMatches(processingBatches, batch => batch.CommitTimeStamp == commitTimeStamp);
-
- commitBatchTasksMap.Remove(commitTimeStamp);
- }
- else // Canceled or Failed
- {
- hasAnyBatchFailed = true;
-
- exceptions.AddRange(
- commitBatchTasks.BatchTasks
- .Select(batch => batch.Task)
- .Where(task => (task.IsFaulted || task.IsCanceled) && task.Exception != null)
- .Select(task => task.Exception));
- }
- }
-
- if (hasAnyBatchFailed)
- {
- DequeueBatchesWhileMatches(processingBatches, batch => batch.Task.IsCompleted);
- }
-
- hasAnyBatchBeenProcessed = true;
-
- EnqueueBatchesIfNoFailures(
- client,
- context,
- commitBatchTasksMap,
- unprocessedBatches,
- processingBatches,
- lastBatch,
- cancellationToken);
- }
-
- if (hasAnyBatchFailed)
- {
- var innerException = exceptions.Count == 1 ? exceptions.Single() : new AggregateException(exceptions);
-
- throw new BatchProcessingException(innerException);
- }
- }
-
- return hasAnyBatchBeenProcessed;
+ return CatalogCommitUtilities.ProcessCatalogCommitsAsync(
+ client,
+ front,
+ back,
+ FetchCatalogCommitsAsync,
+ CreateBatchesAsync,
+ ProcessBatchAsync,
+ _maxConcurrentBatches,
+ _logger,
+ cancellationToken);
}
protected override async Task ProcessGraphsAsync(
@@ -282,64 +182,12 @@ public static ShouldIncludeRegistrationPackage GetShouldIncludeRegistrationPacka
return (k, u, g) => !NuGetVersionUtility.IsGraphSemVer2(k.Version, u, g);
}
- private static void DequeueBatchesWhileMatches(Queue batches, Func isMatch)
- {
- BatchTask batch;
-
- while ((batch = batches.FirstOrDefault()) != null)
- {
- if (isMatch(batch))
- {
- batches.Dequeue();
- }
- else
- {
- break;
- }
- }
- }
-
- private void EnqueueBatchesIfNoFailures(
- CollectorHttpClient client,
- JToken context,
- SortedDictionary commitBatchTasksMap,
- Queue unprocessedBatches,
- Queue processingBatches,
- CatalogItemBatch lastBatch,
- CancellationToken cancellationToken)
- {
- var hasAnyBatchFailed = processingBatches.Any(batch => batch.Task.IsFaulted || batch.Task.IsCanceled);
-
- if (hasAnyBatchFailed)
- {
- return;
- }
-
- var batchesToEnqueue = Math.Min(
- _maxConcurrentBatches - processingBatches.Count(batch => !batch.Task.IsCompleted),
- unprocessedBatches.Count);
-
- for (var i = 0; i < batchesToEnqueue; ++i)
- {
- var batch = unprocessedBatches.Dequeue();
- var batchItem = batch.Items.First();
- var packageId = GetKey(batchItem.Value);
-
- var batchTask = commitBatchTasksMap[batchItem.CommitTimeStamp].BatchTasks
- .Single(bt => bt.PackageId == packageId);
-
- batchTask.Task = ProcessBatchAsync(client, context, packageId, batch, lastBatch, cancellationToken);
-
- processingBatches.Enqueue(batchTask);
- }
- }
-
private async Task ProcessBatchAsync(
CollectorHttpClient client,
JToken context,
string packageId,
- CatalogItemBatch batch,
- CatalogItemBatch lastBatch,
+ CatalogCommitItemBatch batch,
+ CatalogCommitItemBatch lastBatch,
CancellationToken cancellationToken)
{
await Task.Yield();
@@ -354,82 +202,12 @@ private async Task ProcessBatchAsync(
{
await OnProcessBatchAsync(
client,
- batch.Items.Select(item => item.Value),
+ batch.Items,
context,
batch.CommitTimeStamp,
- batch.CommitTimeStamp == lastBatch.CommitTimeStamp,
- cancellationToken);
- }
- }
-
- private SortedDictionary CreateCommitBatchTasksMap(CatalogItemBatch[] batches)
- {
- var map = new SortedDictionary();
-
- foreach (var batch in batches)
- {
- var jObject = batch.Items.First().Value;
- var packageId = GetKey(jObject);
- var batchTask = new BatchTask(batch.CommitTimeStamp, packageId);
-
- foreach (var commitTimeStamp in batch.Items.Select(item => item.CommitTimeStamp))
- {
- CommitBatchTasks commitBatchTasks;
-
- if (!map.TryGetValue(commitTimeStamp, out commitBatchTasks))
- {
- commitBatchTasks = new CommitBatchTasks(commitTimeStamp);
-
- map[commitTimeStamp] = commitBatchTasks;
- }
-
- commitBatchTasks.BatchTasks.Add(batchTask);
- }
- }
-
- return map;
- }
-
- private sealed class BatchTask
- {
- internal BatchTask(DateTime commitTimeStamp, string packageId)
- {
- CommitTimeStamp = commitTimeStamp;
- PackageId = packageId;
- }
-
- internal DateTime CommitTimeStamp { get; }
- internal string PackageId { get; }
- internal Task Task { get; set; }
-
- public override int GetHashCode()
- {
- return PackageId.GetHashCode();
- }
-
- public override bool Equals(object obj)
- {
- var other = obj as BatchTask;
-
- if (ReferenceEquals(other, null))
- {
- return false;
- }
-
- return GetHashCode() == other.GetHashCode();
- }
- }
-
- private sealed class CommitBatchTasks
- {
- internal CommitBatchTasks(DateTime commitTimeStamp)
- {
- BatchTasks = new HashSet();
- CommitTimeStamp = commitTimeStamp;
+ isLastBatch: false,
+ cancellationToken: cancellationToken);
}
-
- internal HashSet BatchTasks { get; }
- internal DateTime CommitTimeStamp { get; }
}
}
}
\ No newline at end of file
diff --git a/src/Catalog/RetryWithExponentialBackoff.cs b/src/Catalog/RetryWithExponentialBackoff.cs
index 0cf7acc8c..56505e27f 100644
--- a/src/Catalog/RetryWithExponentialBackoff.cs
+++ b/src/Catalog/RetryWithExponentialBackoff.cs
@@ -10,7 +10,7 @@
namespace NuGet.Services.Metadata.Catalog
{
// See https://docs.microsoft.com/en-us/dotnet/standard/microservices-architecture/implement-resilient-applications/implement-custom-http-call-retries-exponential-backoff
- internal sealed class RetryWithExponentialBackoff
+ internal sealed class RetryWithExponentialBackoff : IHttpRetryStrategy
{
private readonly ushort _maximumRetries;
private readonly TimeSpan _delay;
@@ -23,7 +23,7 @@ internal RetryWithExponentialBackoff()
_maximumDelay = TimeSpan.FromSeconds(10);
}
- internal async Task SendAsync(HttpClient client, Uri address, CancellationToken cancellationToken)
+ public async Task SendAsync(HttpClient client, Uri address, CancellationToken cancellationToken)
{
var backoff = new ExponentialBackoff(_maximumRetries, _delay, _maximumDelay);
@@ -56,9 +56,10 @@ internal async Task SendAsync(HttpClient client, Uri addres
private static bool IsTransientError(HttpResponseMessage response)
{
- return (int)response.StatusCode >= 500 &&
- response.StatusCode != HttpStatusCode.NotImplemented &&
- response.StatusCode != HttpStatusCode.HttpVersionNotSupported;
+ return response == null
+ || ((int)response.StatusCode >= 500 &&
+ response.StatusCode != HttpStatusCode.NotImplemented &&
+ response.StatusCode != HttpStatusCode.HttpVersionNotSupported);
}
private sealed class ExponentialBackoff
diff --git a/src/Catalog/SortingCollector.cs b/src/Catalog/SortingCollector.cs
index 7524c18b4..bc06124cd 100644
--- a/src/Catalog/SortingCollector.cs
+++ b/src/Catalog/SortingCollector.cs
@@ -13,29 +13,33 @@ namespace NuGet.Services.Metadata.Catalog
{
public abstract class SortingCollector : CommitCollector where T : IEquatable
{
- public SortingCollector(Uri index, ITelemetryService telemetryService, Func handlerFunc = null)
- : base(index, telemetryService, handlerFunc)
+ public SortingCollector(
+ Uri index,
+ ITelemetryService telemetryService,
+ Func handlerFunc = null,
+ IHttpRetryStrategy httpRetryStrategy = null)
+ : base(index, telemetryService, handlerFunc, httpRetryStrategy: httpRetryStrategy)
{
}
protected override async Task OnProcessBatchAsync(
CollectorHttpClient client,
- IEnumerable items,
+ IEnumerable items,
JToken context,
DateTime commitTimeStamp,
bool isLastBatch,
CancellationToken cancellationToken)
{
- IDictionary> sortedItems = new Dictionary>();
+ var sortedItems = new Dictionary>();
- foreach (JObject item in items)
+ foreach (CatalogCommitItem item in items)
{
T key = GetKey(item);
- IList itemList;
+ IList itemList;
if (!sortedItems.TryGetValue(key, out itemList))
{
- itemList = new List();
+ itemList = new List();
sortedItems.Add(key, itemList);
}
@@ -44,7 +48,7 @@ protected override async Task OnProcessBatchAsync(
IList tasks = new List();
- foreach (KeyValuePair> sortedBatch in sortedItems)
+ foreach (KeyValuePair> sortedBatch in sortedItems)
{
Task task = ProcessSortedBatchAsync(client, sortedBatch, context, cancellationToken);
@@ -56,11 +60,11 @@ protected override async Task OnProcessBatchAsync(
return true;
}
- protected abstract T GetKey(JObject item);
+ protected abstract T GetKey(CatalogCommitItem item);
protected abstract Task ProcessSortedBatchAsync(
CollectorHttpClient client,
- KeyValuePair> sortedBatch,
+ KeyValuePair> sortedBatch,
JToken context,
CancellationToken cancellationToken);
}
diff --git a/src/Catalog/SortingGraphCollector.cs b/src/Catalog/SortingGraphCollector.cs
index af1034fa1..75e70ae65 100644
--- a/src/Catalog/SortingGraphCollector.cs
+++ b/src/Catalog/SortingGraphCollector.cs
@@ -20,15 +20,16 @@ public SortingGraphCollector(
Uri index,
Uri[] types,
ITelemetryService telemetryService,
- Func handlerFunc = null)
- : base(index, telemetryService, handlerFunc)
+ Func handlerFunc = null,
+ IHttpRetryStrategy httpRetryStrategy = null)
+ : base(index, telemetryService, handlerFunc, httpRetryStrategy)
{
_types = types;
}
protected override async Task ProcessSortedBatchAsync(
CollectorHttpClient client,
- KeyValuePair> sortedBatch,
+ KeyValuePair> sortedBatch,
JToken context,
CancellationToken cancellationToken)
{
@@ -37,16 +38,25 @@ protected override async Task ProcessSortedBatchAsync(
foreach (var item in sortedBatch.Value)
{
- if (Utils.IsType((JObject)context, item, _types))
+ var isMatch = false;
+
+ foreach (Uri type in _types)
{
- var itemUri = item["@id"].ToString();
+ if (item.TypeUris.Any(typeUri => typeUri.AbsoluteUri == type.AbsoluteUri))
+ {
+ isMatch = true;
+ break;
+ }
+ }
+ if (isMatch)
+ {
// Load package details from catalog.
// Download the graph to a read-only container. This allows operations on each graph to be safely
// parallelized.
- var task = client.GetGraphAsync(new Uri(itemUri), readOnly: true, token: cancellationToken);
+ var task = client.GetGraphAsync(item.Uri, readOnly: true, token: cancellationToken);
- graphTasks.Add(itemUri, task);
+ graphTasks.Add(item.Uri.AbsoluteUri, task);
}
}
diff --git a/src/Catalog/SortingIdCollector.cs b/src/Catalog/SortingIdCollector.cs
index b39ba1ac6..ea45ed787 100644
--- a/src/Catalog/SortingIdCollector.cs
+++ b/src/Catalog/SortingIdCollector.cs
@@ -3,19 +3,23 @@
using System;
using System.Net.Http;
-using Newtonsoft.Json.Linq;
namespace NuGet.Services.Metadata.Catalog
{
public abstract class SortingIdCollector : SortingCollector
{
- public SortingIdCollector(Uri index, ITelemetryService telemetryService, Func handlerFunc = null) : base(index, telemetryService, handlerFunc)
+ public SortingIdCollector(
+ Uri index,
+ ITelemetryService telemetryService,
+ Func handlerFunc = null,
+ IHttpRetryStrategy retryStrategy = null)
+ : base(index, telemetryService, handlerFunc, retryStrategy)
{
}
- protected override string GetKey(JObject item)
+ protected override string GetKey(CatalogCommitItem item)
{
- return item["nuget:id"].ToString();
+ return item.PackageIdentity.Id;
}
}
-}
+}
\ No newline at end of file
diff --git a/src/Catalog/SortingIdVersionCollector.cs b/src/Catalog/SortingIdVersionCollector.cs
index 8911e7221..b21fd0354 100644
--- a/src/Catalog/SortingIdVersionCollector.cs
+++ b/src/Catalog/SortingIdVersionCollector.cs
@@ -3,7 +3,6 @@
using System;
using System.Net.Http;
-using Newtonsoft.Json.Linq;
using NuGet.Services.Metadata.Catalog.Helpers;
namespace NuGet.Services.Metadata.Catalog
@@ -15,9 +14,9 @@ public SortingIdVersionCollector(Uri index, ITelemetryService telemetryService,
{
}
- protected override FeedPackageIdentity GetKey(JObject item)
+ protected override FeedPackageIdentity GetKey(CatalogCommitItem item)
{
- return new FeedPackageIdentity(item["nuget:id"].ToString(), item["nuget:version"].ToString());
+ return new FeedPackageIdentity(item.PackageIdentity);
}
}
-}
+}
\ No newline at end of file
diff --git a/src/Catalog/Strings.Designer.cs b/src/Catalog/Strings.Designer.cs
index 3a222b5de..0a51f5ae6 100644
--- a/src/Catalog/Strings.Designer.cs
+++ b/src/Catalog/Strings.Designer.cs
@@ -60,6 +60,24 @@ internal Strings() {
}
}
+ ///
+ /// Looks up a localized string similar to The argument must be an instance of type {0}..
+ ///
+ internal static string ArgumentMustBeInstanceOfType {
+ get {
+ return ResourceManager.GetString("ArgumentMustBeInstanceOfType", resourceCulture);
+ }
+ }
+
+ ///
+ /// Looks up a localized string similar to The argument must not be null..
+ ///
+ internal static string ArgumentMustNotBeNull {
+ get {
+ return ResourceManager.GetString("ArgumentMustNotBeNull", resourceCulture);
+ }
+ }
+
///
/// Looks up a localized string similar to The argument must not be null, empty, or whitespace..
///
@@ -95,5 +113,32 @@ internal static string BatchProcessingFailure {
return ResourceManager.GetString("BatchProcessingFailure", resourceCulture);
}
}
+
+ ///
+ /// Looks up a localized string similar to Multiple commits exist with the same commit timestamp but different commit ID's: {0}..
+ ///
+ internal static string MultipleCommitIdsForSameCommitTimeStamp {
+ get {
+ return ResourceManager.GetString("MultipleCommitIdsForSameCommitTimeStamp", resourceCulture);
+ }
+ }
+
+ ///
+ /// Looks up a localized string similar to The value of property '{0}' must be non-null and non-empty..
+ ///
+ internal static string NonEmptyPropertyValueRequired {
+ get {
+ return ResourceManager.GetString("NonEmptyPropertyValueRequired", resourceCulture);
+ }
+ }
+
+ ///
+ /// Looks up a localized string similar to The property '{0}' is required and is value must not be null..
+ ///
+ internal static string PropertyRequired {
+ get {
+ return ResourceManager.GetString("PropertyRequired", resourceCulture);
+ }
+ }
}
}
diff --git a/src/Catalog/Strings.resx b/src/Catalog/Strings.resx
index 81e1cc1ad..08c278616 100644
--- a/src/Catalog/Strings.resx
+++ b/src/Catalog/Strings.resx
@@ -117,6 +117,12 @@
System.Resources.ResXResourceWriter, System.Windows.Forms, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089
+
+ The argument must be an instance of type {0}.
+
+
+ The argument must not be null.
+
The argument must not be null, empty, or whitespace.
@@ -129,4 +135,13 @@
A failure occurred while processing a catalog batch.
+
+ Multiple commits exist with the same commit timestamp but different commit ID's: {0}.
+
+
+ The value of property '{0}' must be non-null and non-empty.
+
+
+ The property '{0}' is required and is value must not be null.
+
\ No newline at end of file
diff --git a/src/Catalog/Telemetry/TelemetryConstants.cs b/src/Catalog/Telemetry/TelemetryConstants.cs
index 3ba1be6b0..ed0a64de7 100644
--- a/src/Catalog/Telemetry/TelemetryConstants.cs
+++ b/src/Catalog/Telemetry/TelemetryConstants.cs
@@ -39,6 +39,7 @@ public static class TelemetryConstants
public const string ProcessPackageDetailsSeconds = "ProcessPackageDetailsSeconds";
public const string ProcessPackageVersionIndexSeconds = "ProcessPackageVersionIndexSeconds";
public const string RegistrationDeltaCount = "RegistrationDeltaCount";
+ public const string SizeInBytes = "SizeInBytes";
public const string StatusCode = "StatusCode";
public const string Success = "Success";
public const string Uri = "Uri";
diff --git a/src/Ng/Jobs/Catalog2DnxJob.cs b/src/Ng/Jobs/Catalog2DnxJob.cs
index 996951b63..6f9e7ac90 100644
--- a/src/Ng/Jobs/Catalog2DnxJob.cs
+++ b/src/Ng/Jobs/Catalog2DnxJob.cs
@@ -79,6 +79,8 @@ protected override void Init(IDictionary arguments, Cancellation
preferredPackageSourceStorageFactory);
Logger.LogInformation("HTTP client timeout: {Timeout}", httpClientTimeout);
+ MaxDegreeOfParallelism = 256;
+
_collector = new DnxCatalogCollector(
new Uri(source),
storageFactory,
diff --git a/src/Ng/Jobs/Catalog2RegistrationJob.cs b/src/Ng/Jobs/Catalog2RegistrationJob.cs
index 65e9e47ae..6ac270905 100644
--- a/src/Ng/Jobs/Catalog2RegistrationJob.cs
+++ b/src/Ng/Jobs/Catalog2RegistrationJob.cs
@@ -104,6 +104,7 @@ protected override void Init(IDictionary arguments, Cancellation
storageFactories.SemVer2StorageFactory,
contentBaseAddress == null ? null : new Uri(contentBaseAddress),
TelemetryService,
+ Logger,
CommandHelpers.GetHttpMessageHandlerFactory(TelemetryService, verbose));
var cursorStorage = storageFactories.LegacyStorageFactory.Create();
diff --git a/src/Ng/Jobs/Feed2CatalogJob.cs b/src/Ng/Jobs/Feed2CatalogJob.cs
index d76a72be4..adfe0d759 100644
--- a/src/Ng/Jobs/Feed2CatalogJob.cs
+++ b/src/Ng/Jobs/Feed2CatalogJob.cs
@@ -134,7 +134,7 @@ protected override async Task RunInternalAsync(CancellationToken cancellationTok
packagesEdited = 0;
// baseline timestamps
- var catalogProperties = await FeedHelpers.GetCatalogPropertiesAsync(CatalogStorage, TelemetryService, cancellationToken);
+ var catalogProperties = await CatalogProperties.ReadAsync(CatalogStorage, TelemetryService, cancellationToken);
var lastCreated = catalogProperties.LastCreated ?? (StartDate ?? Constants.DateTimeMinValueUtc);
var lastEdited = catalogProperties.LastEdited ?? lastCreated;
var lastDeleted = catalogProperties.LastDeleted ?? lastCreated;
diff --git a/src/Ng/Jobs/MonitoringProcessorJob.cs b/src/Ng/Jobs/MonitoringProcessorJob.cs
index 815e62169..167b50dd1 100644
--- a/src/Ng/Jobs/MonitoringProcessorJob.cs
+++ b/src/Ng/Jobs/MonitoringProcessorJob.cs
@@ -129,15 +129,31 @@ private async Task HandleQueueMessageAsync(
// We can remove the message from the queue because it was processed.
messageWasProcessed = true;
}
- catch (Exception e)
+ catch (Exception validationFailedToRunException)
{
- // Validations failed to run! Save this failed status to storage.
- await SaveFailedPackageMonitoringStatusAsync(queuedContext, e, token);
- // We can then remove the message from the queue because this failed status can be used to requeue the message.
- messageWasProcessed = true;
+ try
+ {
+ // Validations failed to run! Save this failed status to storage.
+ await SaveFailedPackageMonitoringStatusAsync(queuedContext, validationFailedToRunException, token);
+ // We can then remove the message from the queue because this failed status can be used to requeue the message.
+ messageWasProcessed = true;
+ }
+ catch (Exception failedValidationSaveFailureException)
+ {
+ // We failed to run validations and failed to save the failed validation!
+ // We were not able to process this message. We need to log the exceptions so we can debug the issue.
+ var aggregateException = new AggregateException(
+ "Validations failed to run and saving unsuccessful validation failed!",
+ new[] { validationFailedToRunException, failedValidationSaveFailureException });
+
+ Logger.LogCritical(
+ NuGet.Services.Metadata.Catalog.Monitoring.LogEvents.QueueMessageFatalFailure,
+ aggregateException,
+ "Failed to process queue message");
+ }
}
- // Note that if both validations fail and saving the failure status fail, we cannot remove the message from the queue.
+ // If we failed to run validations and failed to save the failed validation, we cannot remove the message from the queue.
if (messageWasProcessed)
{
await _queue.RemoveAsync(queueMessage, token);
@@ -207,6 +223,7 @@ private async Task> FetchCatalogIndexEntriesFromR
var catalogPageUri = new Uri(leafBlob["@id"].ToString());
var catalogPage = await _client.GetJObjectAsync(catalogPageUri, token);
+
return new CatalogIndexEntry[]
{
new CatalogIndexEntry(
@@ -214,8 +231,7 @@ private async Task> FetchCatalogIndexEntriesFromR
Schema.DataTypes.PackageDetails.ToString(),
catalogPage["catalog:commitId"].ToString(),
DateTime.Parse(catalogPage["catalog:commitTimeStamp"].ToString()),
- id,
- version)
+ new PackageIdentity(id, version))
};
}
diff --git a/src/Ng/Jobs/NgJob.cs b/src/Ng/Jobs/NgJob.cs
index 973c2069b..9d65c0a5d 100644
--- a/src/Ng/Jobs/NgJob.cs
+++ b/src/Ng/Jobs/NgJob.cs
@@ -17,7 +17,7 @@ public abstract class NgJob
protected ILoggerFactory LoggerFactory;
protected ILogger Logger;
- protected int MaxDegreeOfParallelism { get; }
+ protected int MaxDegreeOfParallelism { get; set; }
protected NgJob(ITelemetryService telemetryService, ILoggerFactory loggerFactory)
{
diff --git a/src/Ng/Jobs/Package2CatalogJob.cs b/src/Ng/Jobs/Package2CatalogJob.cs
index 357099d05..561f8ad7f 100644
--- a/src/Ng/Jobs/Package2CatalogJob.cs
+++ b/src/Ng/Jobs/Package2CatalogJob.cs
@@ -91,7 +91,7 @@ protected override async Task RunInternalAsync(CancellationToken cancellationTok
Logger.LogInformation($"Downloading {packages.Select(t => t.Value.Count).Sum()} packages");
// the idea here is to leave the lastCreated, lastEdited and lastDeleted values exactly as they were
- var catalogProperties = await FeedHelpers.GetCatalogPropertiesAsync(_storage, TelemetryService, cancellationToken);
+ var catalogProperties = await CatalogProperties.ReadAsync(_storage, TelemetryService, cancellationToken);
var lastCreated = catalogProperties.LastCreated ?? DateTime.MinValue.ToUniversalTime();
var lastEdited = catalogProperties.LastEdited ?? DateTime.MinValue.ToUniversalTime();
var lastDeleted = catalogProperties.LastDeleted ?? DateTime.MinValue.ToUniversalTime();
diff --git a/src/Ng/SearchIndexFromCatalogCollector.cs b/src/Ng/SearchIndexFromCatalogCollector.cs
index 2fa33ff1c..e53477afd 100644
--- a/src/Ng/SearchIndexFromCatalogCollector.cs
+++ b/src/Ng/SearchIndexFromCatalogCollector.cs
@@ -40,8 +40,9 @@ public SearchIndexFromCatalogCollector(
string baseAddress,
ITelemetryService telemetryService,
ILogger logger,
- Func handlerFunc = null)
- : base(index, telemetryService, handlerFunc)
+ Func handlerFunc = null,
+ IHttpRetryStrategy httpRetryStrategy = null)
+ : base(index, telemetryService, handlerFunc, httpRetryStrategy: httpRetryStrategy)
{
_indexWriter = indexWriter;
_commitEachBatch = commitEachBatch;
@@ -50,7 +51,13 @@ public SearchIndexFromCatalogCollector(
_logger = logger;
}
- protected override async Task OnProcessBatchAsync(CollectorHttpClient client, IEnumerable items, JToken context, DateTime commitTimeStamp, bool isLastBatch, CancellationToken cancellationToken)
+ protected override async Task OnProcessBatchAsync(
+ CollectorHttpClient client,
+ IEnumerable items,
+ JToken context,
+ DateTime commitTimeStamp,
+ bool isLastBatch,
+ CancellationToken cancellationToken)
{
JObject catalogIndex = null;
if (_baseAddress != null)
@@ -152,16 +159,14 @@ private async Task CommitIndexAsync()
private static async Task> FetchCatalogItemsAsync(
CollectorHttpClient client,
- IEnumerable items,
+ IEnumerable items,
CancellationToken cancellationToken)
{
- IList> tasks = new List>();
+ var tasks = new List>();
- foreach (JToken item in items)
+ foreach (var item in items)
{
- Uri catalogItemUri = item["@id"].ToObject();
-
- tasks.Add(client.GetJObjectAsync(catalogItemUri, cancellationToken));
+ tasks.Add(client.GetJObjectAsync(item.Uri, cancellationToken));
}
await Task.WhenAll(tasks);
diff --git a/src/NuGet.Protocol.Catalog/CatalogClient.cs b/src/NuGet.Protocol.Catalog/CatalogClient.cs
new file mode 100644
index 000000000..b03686ed9
--- /dev/null
+++ b/src/NuGet.Protocol.Catalog/CatalogClient.cs
@@ -0,0 +1,113 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+using System;
+using System.IO;
+using System.Net.Http;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+using Newtonsoft.Json;
+
+namespace NuGet.Protocol.Catalog
+{
+ public class CatalogClient : ICatalogClient
+ {
+ private static readonly JsonSerializer JsonSerializer = CatalogJsonSerialization.Serializer;
+ private readonly HttpClient _httpClient;
+ private readonly ILogger _logger;
+
+ public CatalogClient(HttpClient httpClient, ILogger logger)
+ {
+ _httpClient = httpClient ?? throw new ArgumentNullException(nameof(httpClient));
+ _logger = logger ?? throw new ArgumentNullException(nameof(logger));
+ }
+
+ public Task GetIndexAsync(string indexUrl)
+ {
+ return DeserializeUrlAsync(indexUrl);
+ }
+
+ public Task GetPageAsync(string pageUrl)
+ {
+ return DeserializeUrlAsync(pageUrl);
+ }
+
+ public async Task GetLeafAsync(string leafUrl)
+ {
+ // Buffer all of the JSON so we can parse twice. Once to determine the leaf type and once to deserialize
+ // the entire thing to the proper leaf type.
+ _logger.LogDebug("Downloading {leafUrl} as a byte array.", leafUrl);
+ var jsonBytes = await _httpClient.GetByteArrayAsync(leafUrl);
+ var untypedLeaf = DeserializeBytes(jsonBytes);
+
+ switch (untypedLeaf.Type)
+ {
+ case CatalogLeafType.PackageDetails:
+ return DeserializeBytes(jsonBytes);
+ case CatalogLeafType.PackageDelete:
+ return DeserializeBytes(jsonBytes);
+ default:
+ throw new NotSupportedException($"The catalog leaf type '{untypedLeaf.Type}' is not supported.");
+ }
+ }
+
+ private async Task GetLeafAsync(CatalogLeafType type, string leafUrl)
+ {
+ switch (type)
+ {
+ case CatalogLeafType.PackageDetails:
+ return await GetPackageDetailsLeafAsync(leafUrl);
+ case CatalogLeafType.PackageDelete:
+ return await GetPackageDeleteLeafAsync(leafUrl);
+ default:
+ throw new NotSupportedException($"The catalog leaf type '{type}' is not supported.");
+ }
+ }
+
+ public Task GetPackageDeleteLeafAsync(string leafUrl)
+ {
+ return GetAndValidateLeafAsync(CatalogLeafType.PackageDelete, leafUrl);
+ }
+
+ public Task GetPackageDetailsLeafAsync(string leafUrl)
+ {
+ return GetAndValidateLeafAsync(CatalogLeafType.PackageDetails, leafUrl);
+ }
+
+ private async Task GetAndValidateLeafAsync(CatalogLeafType type, string leafUrl) where T : CatalogLeaf
+ {
+ var leaf = await DeserializeUrlAsync(leafUrl);
+
+ if (leaf.Type != type)
+ {
+ throw new ArgumentException(
+ $"The leaf type found in the document does not match the expected '{type}' type.",
+ nameof(type));
+ }
+
+ return leaf;
+ }
+
+ private T DeserializeBytes(byte[] jsonBytes)
+ {
+ using (var stream = new MemoryStream(jsonBytes))
+ using (var textReader = new StreamReader(stream))
+ using (var jsonReader = new JsonTextReader(textReader))
+ {
+ return JsonSerializer.Deserialize(jsonReader);
+ }
+ }
+
+ private async Task DeserializeUrlAsync(string documentUrl)
+ {
+ _logger.LogDebug("Downloading {documentUrl} as a stream.", documentUrl);
+
+ using (var stream = await _httpClient.GetStreamAsync(documentUrl))
+ using (var textReader = new StreamReader(stream))
+ using (var jsonReader = new JsonTextReader(textReader))
+ {
+ return JsonSerializer.Deserialize(jsonReader);
+ }
+ }
+ }
+}
diff --git a/src/NuGet.Protocol.Catalog/CatalogProcessor.cs b/src/NuGet.Protocol.Catalog/CatalogProcessor.cs
new file mode 100644
index 000000000..e798f969a
--- /dev/null
+++ b/src/NuGet.Protocol.Catalog/CatalogProcessor.cs
@@ -0,0 +1,218 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+using System;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+using NuGet.Protocol.Core.Types;
+
+namespace NuGet.Protocol.Catalog
+{
+ public class CatalogProcessor
+ {
+ private const string CatalogResourceType = "Catalog/3.0.0";
+ private readonly ICatalogLeafProcessor _leafProcessor;
+ private readonly ICatalogClient _client;
+ private readonly ICursor _cursor;
+ private readonly ILogger _logger;
+ private readonly CatalogProcessorSettings _settings;
+
+ public CatalogProcessor(
+ ICursor cursor,
+ ICatalogClient client,
+ ICatalogLeafProcessor leafProcessor,
+ CatalogProcessorSettings settings,
+ ILogger logger)
+ {
+ _leafProcessor = leafProcessor ?? throw new ArgumentNullException(nameof(leafProcessor));
+ _client = client ?? throw new ArgumentNullException(nameof(client));
+ _cursor = cursor ?? throw new ArgumentNullException(nameof(cursor));
+ _logger = logger ?? throw new ArgumentNullException(nameof(logger));
+
+ if (settings == null)
+ {
+ throw new ArgumentNullException(nameof(settings));
+ }
+
+ if (settings.ServiceIndexUrl == null)
+ {
+ throw new ArgumentException(
+ $"The {nameof(CatalogProcessorSettings.ServiceIndexUrl)} property of the " +
+ $"{nameof(CatalogProcessorSettings)} must not be null.",
+ nameof(settings));
+ }
+
+ // Clone the settings to avoid mutability issues.
+ _settings = settings.Clone();
+ }
+
+ ///
+ /// Discovers and downloads all of the catalog leafs after the current cursor value and before the maximum
+ /// commit timestamp found in the settings. Each catalog leaf is passed to the catalog leaf processor in
+ /// chronological order. After a commit is completed, its commit timestamp is written to the cursor, i.e. when
+ /// transitioning from commit timestamp A to B, A is written to the cursor so that it never is processed again.
+ ///
+ /// True if all of the catalog leaves found were processed successfully.
+ public async Task ProcessAsync()
+ {
+ var catalogIndexUrl = await GetCatalogIndexUrlAsync();
+
+ var minCommitTimestamp = await GetMinCommitTimestamp();
+ _logger.LogInformation(
+ "Using time bounds {min:O} (exclusive) to {max:O} (inclusive).",
+ minCommitTimestamp,
+ _settings.MaxCommitTimestamp);
+
+ return await ProcessIndexAsync(catalogIndexUrl, minCommitTimestamp);
+ }
+
+ private async Task ProcessIndexAsync(string catalogIndexUrl, DateTimeOffset minCommitTimestamp)
+ {
+ var index = await _client.GetIndexAsync(catalogIndexUrl);
+
+ var pageItems = index.GetPagesInBounds(
+ minCommitTimestamp,
+ _settings.MaxCommitTimestamp);
+ _logger.LogInformation(
+ "{pages} pages were in the time bounds, out of {totalPages}.",
+ pageItems.Count,
+ index.Items.Count);
+
+ var success = true;
+ for (var i = 0; i < pageItems.Count; i++)
+ {
+ success = await ProcessPageAsync(minCommitTimestamp, pageItems[i]);
+ if (!success)
+ {
+ _logger.LogWarning(
+ "{unprocessedPages} out of {pages} pages were left incomplete due to a processing failure.",
+ pageItems.Count - i,
+ pageItems.Count);
+ break;
+ }
+ }
+
+ return success;
+ }
+
+ private async Task ProcessPageAsync(DateTimeOffset minCommitTimestamp, CatalogPageItem pageItem)
+ {
+ var page = await _client.GetPageAsync(pageItem.Url);
+
+ var leafItems = page.GetLeavesInBounds(
+ minCommitTimestamp,
+ _settings.MaxCommitTimestamp,
+ _settings.ExcludeRedundantLeaves);
+ _logger.LogInformation(
+ "On page {page}, {leaves} out of {totalLeaves} were in the time bounds.",
+ pageItem.Url,
+ leafItems.Count,
+ page.Items.Count);
+
+ DateTimeOffset? newCursor = null;
+ var success = true;
+ for (var i = 0; i < leafItems.Count; i++)
+ {
+ var leafItem = leafItems[i];
+
+ if (newCursor.HasValue && newCursor.Value != leafItem.CommitTimestamp)
+ {
+ await _cursor.SetAsync(newCursor.Value);
+ }
+
+ newCursor = leafItem.CommitTimestamp;
+
+ success = await ProcessLeafAsync(leafItem);
+ if (!success)
+ {
+ _logger.LogWarning(
+ "{unprocessedLeaves} out of {leaves} leaves were left incomplete due to a processing failure.",
+ leafItems.Count - i,
+ leafItems.Count);
+ break;
+ }
+ }
+
+ if (newCursor.HasValue && success)
+ {
+ await _cursor.SetAsync(newCursor.Value);
+ }
+
+ return success;
+ }
+
+ private async Task ProcessLeafAsync(CatalogLeafItem leafItem)
+ {
+ bool success;
+ try
+ {
+ switch (leafItem.Type)
+ {
+ case CatalogLeafType.PackageDelete:
+ var packageDelete = await _client.GetPackageDeleteLeafAsync(leafItem.Url);
+ success = await _leafProcessor.ProcessPackageDeleteAsync(packageDelete);
+ break;
+ case CatalogLeafType.PackageDetails:
+ var packageDetails = await _client.GetPackageDetailsLeafAsync(leafItem.Url);
+ success = await _leafProcessor.ProcessPackageDetailsAsync(packageDetails);
+ break;
+ default:
+ throw new NotSupportedException($"The catalog leaf type '{leafItem.Type}' is not supported.");
+ }
+ }
+ catch (Exception exception)
+ {
+ _logger.LogError(
+ 0,
+ exception,
+ "An exception was thrown while processing leaf {leafUrl}.",
+ leafItem.Url);
+ success = false;
+ }
+
+ if (!success)
+ {
+ _logger.LogWarning(
+ "Failed to process leaf {leafUrl} ({packageId} {packageVersion}, {leafType}).",
+ leafItem.Url,
+ leafItem.PackageId,
+ leafItem.PackageVersion,
+ leafItem.Type);
+ }
+
+ return success;
+ }
+
+ private async Task GetMinCommitTimestamp()
+ {
+ var minCommitTimestamp = await _cursor.GetAsync();
+
+ minCommitTimestamp = minCommitTimestamp
+ ?? _settings.DefaultMinCommitTimestamp
+ ?? _settings.MinCommitTimestamp;
+
+ if (minCommitTimestamp.Value < _settings.MinCommitTimestamp)
+ {
+ minCommitTimestamp = _settings.MinCommitTimestamp;
+ }
+
+ return minCommitTimestamp.Value;
+ }
+
+ private async Task GetCatalogIndexUrlAsync()
+ {
+ _logger.LogInformation("Getting catalog index URL from {serviceIndexUrl}.", _settings.ServiceIndexUrl);
+ string catalogIndexUrl;
+ var sourceRepository = Repository.Factory.GetCoreV3(_settings.ServiceIndexUrl, FeedType.HttpV3);
+ var serviceIndexResource = await sourceRepository.GetResourceAsync();
+ catalogIndexUrl = serviceIndexResource.GetServiceEntryUri(CatalogResourceType)?.AbsoluteUri;
+ if (catalogIndexUrl == null)
+ {
+ throw new InvalidOperationException(
+ $"The service index does not contain resource '{CatalogResourceType}'.");
+ }
+
+ return catalogIndexUrl;
+ }
+ }
+}
diff --git a/src/NuGet.Protocol.Catalog/CatalogProcessorSettings.cs b/src/NuGet.Protocol.Catalog/CatalogProcessorSettings.cs
new file mode 100644
index 000000000..7c61a1836
--- /dev/null
+++ b/src/NuGet.Protocol.Catalog/CatalogProcessorSettings.cs
@@ -0,0 +1,61 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+using System;
+
+namespace NuGet.Protocol.Catalog
+{
+ ///
+ /// Settings for how should behave. Defaults to processing all catalog items on
+ /// .
+ ///
+ public class CatalogProcessorSettings
+ {
+ public CatalogProcessorSettings()
+ {
+ ServiceIndexUrl = "https://api.nuget.org/v3/index.json";
+ DefaultMinCommitTimestamp = null;
+ MinCommitTimestamp = DateTimeOffset.MinValue;
+ MaxCommitTimestamp = DateTimeOffset.MaxValue;
+ ExcludeRedundantLeaves = true;
+ }
+
+ internal CatalogProcessorSettings Clone()
+ {
+ return new CatalogProcessorSettings
+ {
+ ServiceIndexUrl = ServiceIndexUrl,
+ DefaultMinCommitTimestamp = DefaultMinCommitTimestamp,
+ MinCommitTimestamp = MinCommitTimestamp,
+ MaxCommitTimestamp = MaxCommitTimestamp,
+ ExcludeRedundantLeaves = ExcludeRedundantLeaves,
+ };
+ }
+
+ ///
+ /// The service index to discover the catalog index URL.
+ ///
+ public string ServiceIndexUrl { get; set; }
+
+ ///
+ /// The minimum commit timestamp to use when no cursor value has been saved.
+ ///
+ public DateTimeOffset? DefaultMinCommitTimestamp { get; set; }
+
+ ///
+ /// The absolute minimum (exclusive) commit timestamp to process in the catalog.
+ ///
+ public DateTimeOffset MinCommitTimestamp { get; set; }
+
+ ///
+ /// The absolute maximum (inclusive) commit timestamp to process in the catalog.
+ ///
+ public DateTimeOffset MaxCommitTimestamp { get; set; }
+
+ ///
+ /// If multiple catalog leaves are found in a page concerning the same package ID and version, only the latest
+ /// is processed.
+ ///
+ public bool ExcludeRedundantLeaves { get; set; }
+ }
+}
diff --git a/src/NuGet.Protocol.Catalog/FileCursor.cs b/src/NuGet.Protocol.Catalog/FileCursor.cs
new file mode 100644
index 000000000..75284fed7
--- /dev/null
+++ b/src/NuGet.Protocol.Catalog/FileCursor.cs
@@ -0,0 +1,58 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+using System;
+using System.IO;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+using Newtonsoft.Json;
+
+namespace NuGet.Protocol.Catalog
+{
+ ///
+ /// A cursor implementation which stores the cursor in local file. The cursor value is written to the file as
+ /// a JSON object.
+ ///
+ public class FileCursor : ICursor
+ {
+ private static readonly JsonSerializerSettings Settings = CatalogJsonSerialization.Settings;
+ private readonly string _path;
+ private readonly ILogger _logger;
+
+ public FileCursor(string path, ILogger logger)
+ {
+ _path = path ?? throw new ArgumentNullException(nameof(path));
+ _logger = logger ?? throw new ArgumentNullException(nameof(logger));
+ }
+
+ public Task GetAsync()
+ {
+ try
+ {
+ var jsonString = File.ReadAllText(_path);
+ var data = JsonConvert.DeserializeObject(jsonString, Settings);
+ _logger.LogDebug("Read cursor value {cursor:O} from {path}.", data.Value, _path);
+ return Task.FromResult(data.Value);
+ }
+ catch (Exception e) when (e is FileNotFoundException || e is JsonException)
+ {
+ return Task.FromResult(null);
+ }
+ }
+
+ public Task SetAsync(DateTimeOffset value)
+ {
+ var data = new Data { Value = value };
+ var jsonString = JsonConvert.SerializeObject(data);
+ File.WriteAllText(_path, jsonString);
+ _logger.LogDebug("Wrote cursor value {cursor:O} to {path}.", data.Value, _path);
+ return Task.CompletedTask;
+ }
+
+ private class Data
+ {
+ [JsonProperty("value")]
+ public DateTimeOffset Value { get; set; }
+ }
+ }
+}
diff --git a/src/NuGet.Protocol.Catalog/ICatalogClient.cs b/src/NuGet.Protocol.Catalog/ICatalogClient.cs
new file mode 100644
index 000000000..4ecbee386
--- /dev/null
+++ b/src/NuGet.Protocol.Catalog/ICatalogClient.cs
@@ -0,0 +1,54 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+using System.Threading.Tasks;
+
+namespace NuGet.Protocol.Catalog
+{
+ public interface ICatalogClient
+ {
+ ///
+ /// Get the catalog index at the provided URL. The catalog index URL should be discovered from the
+ /// service index.
+ ///
+ /// The catalog index URL.
+ /// The catalog index.
+ Task GetIndexAsync(string indexUrl);
+
+ ///
+ /// Get the catalog page at the provided URL. The catalog page URL should be discovered from the catalog
+ /// index.
+ ///
+ /// The catalog page URL.
+ /// The catalog page.
+ Task GetPageAsync(string pageUrl);
+
+ ///
+ /// Gets the catalog leaf at the provided URL. The catalog leaf URL should be discovered from a catalog page.
+ /// The type of the catalog leaf is automatically determined from the fetched document.
+ ///
+ /// The catalog leaf URL.
+ /// The catalog leaf.
+ Task GetLeafAsync(string leafUrl);
+
+ ///
+ /// Gets the catalog leaf at the provided URL. The catalog leaf URL should be discovered from a catalog page.
+ /// The type of the catalog leaf must be a package delete. If the actual document is not a package delete, an
+ /// exception is thrown.
+ ///
+ /// The catalog leaf URL.
+ /// Thrown if the actual document is not a package delete.
+ /// The catalog leaf.
+ Task GetPackageDeleteLeafAsync(string leafUrl);
+
+ ///
+ /// Gets the catalog leaf at the provided URL. The catalog leaf URL should be discovered from a catalog page.
+ /// The type of the catalog leaf must be package details. If the actual document is not package details, an
+ /// exception is thrown.
+ ///
+ /// The catalog leaf URL.
+ /// Thrown if the actual document is not package details.
+ /// The catalog leaf.
+ Task GetPackageDetailsLeafAsync(string leafUrl);
+ }
+}
\ No newline at end of file
diff --git a/src/NuGet.Protocol.Catalog/ICatalogLeafProcessor.cs b/src/NuGet.Protocol.Catalog/ICatalogLeafProcessor.cs
new file mode 100644
index 000000000..74b1f9733
--- /dev/null
+++ b/src/NuGet.Protocol.Catalog/ICatalogLeafProcessor.cs
@@ -0,0 +1,36 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+using System.Threading.Tasks;
+
+namespace NuGet.Protocol.Catalog
+{
+ ///
+ /// An interface which allows custom processing of catalog leaves. This interface should be implemented when the
+ /// catalog leaf documents need to be downloaded and processed in chronological order.
+ ///
+ public interface ICatalogLeafProcessor
+ {
+ ///
+ /// Process a catalog leaf containing package details. This method should return false or throw an exception
+ /// if the catalog leaf cannot be processed. In this case, the will stop
+ /// processing items. Note that the same package ID/version combination can be passed to this multiple times,
+ /// for example due to an edit in the package metadata or due to a transient error and retry on the part of the
+ /// .
+ ///
+ /// The leaf document.
+ /// True, if the leaf was successfully processed. False, otherwise.
+ Task ProcessPackageDetailsAsync(PackageDetailsCatalogLeaf leaf);
+
+ ///
+ /// Process a catalog leaf containing a package delete. This method should return false or throw an exception
+ /// if the catalog leaf cannot be processed. In this case, the will stop
+ /// processing items. Note that the same package ID/version combination can be passed to this multiple times,
+ /// for example due to a package being deleted again due to a transient error and retry on the part of the
+ /// .
+ ///
+ /// The leaf document.
+ /// True, if the leaf was successfully processed. False, otherwise.
+ Task ProcessPackageDeleteAsync(PackageDeleteCatalogLeaf leaf);
+ }
+}
diff --git a/src/NuGet.Protocol.Catalog/ICursor.cs b/src/NuGet.Protocol.Catalog/ICursor.cs
new file mode 100644
index 000000000..241699b52
--- /dev/null
+++ b/src/NuGet.Protocol.Catalog/ICursor.cs
@@ -0,0 +1,27 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+using System;
+using System.Threading.Tasks;
+
+namespace NuGet.Protocol.Catalog
+{
+ ///
+ /// An interface which allows reading and writing a cursor value. The value is up to what point in the catalog
+ /// has been successfully processed. The value is a catalog commit timestamp.
+ ///
+ public interface ICursor
+ {
+ ///
+ /// Get the value of the cursor.
+ ///
+ /// The cursor value. Null if the cursor has no value yet.
+ Task GetAsync();
+
+ ///
+ /// Set the value of the cursor.
+ ///
+ /// The new cursor value.
+ Task SetAsync(DateTimeOffset value);
+ }
+}
diff --git a/src/NuGet.Protocol.Catalog/Models/CatalogIndex.cs b/src/NuGet.Protocol.Catalog/Models/CatalogIndex.cs
new file mode 100644
index 000000000..eb69b380a
--- /dev/null
+++ b/src/NuGet.Protocol.Catalog/Models/CatalogIndex.cs
@@ -0,0 +1,21 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+using System;
+using System.Collections.Generic;
+using Newtonsoft.Json;
+
+namespace NuGet.Protocol.Catalog
+{
+ public class CatalogIndex
+ {
+ [JsonProperty("commitTimeStamp")]
+ public DateTimeOffset CommitTimestamp { get; set; }
+
+ [JsonProperty("count")]
+ public int Count { get; set; }
+
+ [JsonProperty("items")]
+ public List Items { get; set; }
+ }
+}
diff --git a/src/NuGet.Protocol.Catalog/Models/CatalogLeaf.cs b/src/NuGet.Protocol.Catalog/Models/CatalogLeaf.cs
new file mode 100644
index 000000000..e85ed4c34
--- /dev/null
+++ b/src/NuGet.Protocol.Catalog/Models/CatalogLeaf.cs
@@ -0,0 +1,27 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+using System;
+using Newtonsoft.Json;
+
+namespace NuGet.Protocol.Catalog
+{
+ public class CatalogLeaf : ICatalogLeafItem
+ {
+ [JsonProperty("@type")]
+ [JsonConverter(typeof(CatalogLeafTypeConverter))]
+ public CatalogLeafType Type { get; set; }
+
+ [JsonProperty("catalog:commitTimeStamp")]
+ public DateTimeOffset CommitTimestamp { get; set; }
+
+ [JsonProperty("id")]
+ public string PackageId { get; set; }
+
+ [JsonProperty("published")]
+ public DateTimeOffset Published { get; set; }
+
+ [JsonProperty("version")]
+ public string PackageVersion { get; set; }
+ }
+}
diff --git a/src/NuGet.Protocol.Catalog/Models/CatalogLeafItem.cs b/src/NuGet.Protocol.Catalog/Models/CatalogLeafItem.cs
new file mode 100644
index 000000000..1d6e42ca1
--- /dev/null
+++ b/src/NuGet.Protocol.Catalog/Models/CatalogLeafItem.cs
@@ -0,0 +1,27 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+using System;
+using Newtonsoft.Json;
+
+namespace NuGet.Protocol.Catalog
+{
+ public class CatalogLeafItem : ICatalogLeafItem
+ {
+ [JsonProperty("@id")]
+ public string Url { get; set; }
+
+ [JsonProperty("@type")]
+ [JsonConverter(typeof(CatalogLeafItemTypeConverter))]
+ public CatalogLeafType Type { get; set; }
+
+ [JsonProperty("commitTimeStamp")]
+ public DateTimeOffset CommitTimestamp { get; set; }
+
+ [JsonProperty("nuget:id")]
+ public string PackageId { get; set; }
+
+ [JsonProperty("nuget:version")]
+ public string PackageVersion { get; set; }
+ }
+}
diff --git a/src/NuGet.Protocol.Catalog/Models/CatalogLeafType.cs b/src/NuGet.Protocol.Catalog/Models/CatalogLeafType.cs
new file mode 100644
index 000000000..d23ef3d20
--- /dev/null
+++ b/src/NuGet.Protocol.Catalog/Models/CatalogLeafType.cs
@@ -0,0 +1,12 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+namespace NuGet.Protocol.Catalog
+{
+ public enum CatalogLeafType
+ {
+ PackageDetails = 1,
+
+ PackageDelete = 2,
+ }
+}
diff --git a/src/NuGet.Protocol.Catalog/Models/CatalogPage.cs b/src/NuGet.Protocol.Catalog/Models/CatalogPage.cs
new file mode 100644
index 000000000..ab7538238
--- /dev/null
+++ b/src/NuGet.Protocol.Catalog/Models/CatalogPage.cs
@@ -0,0 +1,24 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+using System;
+using System.Collections.Generic;
+using Newtonsoft.Json;
+
+namespace NuGet.Protocol.Catalog
+{
+ public class CatalogPage
+ {
+ [JsonProperty("commitTimeStamp")]
+ public DateTimeOffset CommitTimestamp { get; set; }
+
+ [JsonProperty("count")]
+ public int Count { get; set; }
+
+ [JsonProperty("items")]
+ public List Items { get; set; }
+
+ [JsonProperty("parent")]
+ public string Parent { get; set; }
+ }
+}
diff --git a/src/NuGet.Protocol.Catalog/Models/CatalogPageItem.cs b/src/NuGet.Protocol.Catalog/Models/CatalogPageItem.cs
new file mode 100644
index 000000000..4677d5682
--- /dev/null
+++ b/src/NuGet.Protocol.Catalog/Models/CatalogPageItem.cs
@@ -0,0 +1,20 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+using System;
+using Newtonsoft.Json;
+
+namespace NuGet.Protocol.Catalog
+{
+ public class CatalogPageItem
+ {
+ [JsonProperty("@id")]
+ public string Url { get; set; }
+
+ [JsonProperty("commitTimeStamp")]
+ public DateTimeOffset CommitTimestamp { get; set; }
+
+ [JsonProperty("count")]
+ public int Count { get; set; }
+ }
+}
diff --git a/src/NuGet.Protocol.Catalog/Models/ICatalogLeafItem.cs b/src/NuGet.Protocol.Catalog/Models/ICatalogLeafItem.cs
new file mode 100644
index 000000000..6b80064cf
--- /dev/null
+++ b/src/NuGet.Protocol.Catalog/Models/ICatalogLeafItem.cs
@@ -0,0 +1,15 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+using System;
+
+namespace NuGet.Protocol.Catalog
+{
+ public interface ICatalogLeafItem
+ {
+ DateTimeOffset CommitTimestamp { get; }
+ string PackageId { get; }
+ string PackageVersion { get; }
+ CatalogLeafType Type { get; }
+ }
+}
\ No newline at end of file
diff --git a/src/NuGet.Protocol.Catalog/Models/ModelExtensions.cs b/src/NuGet.Protocol.Catalog/Models/ModelExtensions.cs
new file mode 100644
index 000000000..86a57a919
--- /dev/null
+++ b/src/NuGet.Protocol.Catalog/Models/ModelExtensions.cs
@@ -0,0 +1,170 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using NuGet.Frameworks;
+using NuGet.Packaging.Core;
+using NuGet.Versioning;
+
+namespace NuGet.Protocol.Catalog
+{
+ ///
+ /// These are documented interpretations of values returned by the catalog API.
+ ///
+ public static class ModelExtensions
+ {
+ ///
+ /// Gets the leaves that lie within the provided commit timestamp bounds. The result is sorted by commit
+ /// timestamp, then package ID, then package version (SemVer order).
+ ///
+ ///
+ /// The exclusive lower time bound on .
+ /// The inclusive upper time bound on .
+ /// Only show the latest leaf concerning each package.
+ public static List GetLeavesInBounds(
+ this CatalogPage catalogPage,
+ DateTimeOffset minCommitTimestamp,
+ DateTimeOffset maxCommitTimestamp,
+ bool excludeRedundantLeaves)
+ {
+ var leaves = catalogPage
+ .Items
+ .Where(x => x.CommitTimestamp > minCommitTimestamp && x.CommitTimestamp <= maxCommitTimestamp)
+ .OrderBy(x => x.CommitTimestamp);
+
+ if (excludeRedundantLeaves)
+ {
+ leaves = leaves
+ .GroupBy(x => new PackageIdentity(x.PackageId, x.ParsePackageVersion()))
+ .Select(x => x.Last())
+ .OrderBy(x => x.CommitTimestamp);
+ }
+
+ return leaves
+ .ThenBy(x => x.PackageId, StringComparer.OrdinalIgnoreCase)
+ .ThenBy(x => x.ParsePackageVersion())
+ .ToList();
+ }
+
+ ///
+ /// Gets the pages that may have catalog leaves within the provided commit timestamp bounds. The result is
+ /// sorted by commit timestamp.
+ ///
+ /// The catalog index to fetch pages from.
+ /// The exclusive lower time bound on .
+ /// The inclusive upper time bound on .
+ public static List GetPagesInBounds(
+ this CatalogIndex catalogIndex,
+ DateTimeOffset minCommitTimestamp,
+ DateTimeOffset maxCommitTimestamp)
+ {
+ return catalogIndex
+ .GetPagesInBoundsLazy(minCommitTimestamp, maxCommitTimestamp)
+ .ToList();
+ }
+
+ private static IEnumerable GetPagesInBoundsLazy(
+ this CatalogIndex catalogIndex,
+ DateTimeOffset minCommitTimestamp,
+ DateTimeOffset maxCommitTimestamp)
+ {
+ // Filter out pages that fall entirely before the minimum commit timestamp and sort the remaining pages by
+ // commit timestamp.
+ var upperRange = catalogIndex
+ .Items
+ .Where(x => x.CommitTimestamp > minCommitTimestamp)
+ .OrderBy(x => x.CommitTimestamp);
+
+ // Take pages from the sorted list until the commit timestamp goes past the maximum commit timestamp. This
+ // essentially LINQ's TakeWhile plus one more element.
+ foreach (var page in upperRange)
+ {
+ yield return page;
+
+ if (page.CommitTimestamp > maxCommitTimestamp)
+ {
+ break;
+ }
+ }
+ }
+
+ ///
+ /// Parse the package version as a .
+ ///
+ /// The catalog leaf.
+ /// The package version.
+ public static NuGetVersion ParsePackageVersion(this ICatalogLeafItem leaf)
+ {
+ return NuGetVersion.Parse(leaf.PackageVersion);
+ }
+
+ ///
+ /// Parse the target framework as a .
+ ///
+ /// The package dependency group.
+ /// The framework.
+ public static NuGetFramework ParseTargetFramework(this PackageDependencyGroup packageDependencyGroup)
+ {
+ if (string.IsNullOrEmpty(packageDependencyGroup.TargetFramework))
+ {
+ return NuGetFramework.AnyFramework;
+ }
+
+ return NuGetFramework.Parse(packageDependencyGroup.TargetFramework);
+ }
+
+ ///
+ /// Parse the version range as a .
+ ///
+ /// The package dependency.
+ /// The version range.
+ public static VersionRange ParseRange(this PackageDependency packageDependency)
+ {
+ if (string.IsNullOrEmpty(packageDependency.Range))
+ {
+ return VersionRange.All;
+ }
+
+ return VersionRange.Parse(packageDependency.Range);
+ }
+
+ ///
+ /// Determines if the provided catalog leaf is a package delete.
+ ///
+ /// The catalog leaf.
+ /// True if the catalog leaf represents a package delete.
+ public static bool IsPackageDelete(this ICatalogLeafItem leaf)
+ {
+ return leaf.Type == CatalogLeafType.PackageDelete;
+ }
+
+ ///
+ /// Determines if the provided catalog leaf is contains package details.
+ ///
+ /// The catalog leaf.
+ /// True if the catalog leaf contains package details.
+ public static bool IsPackageDetails(this ICatalogLeafItem leaf)
+ {
+ return leaf.Type == CatalogLeafType.PackageDetails;
+ }
+
+ ///
+ /// Determines if the provided package details list represents a listed package.
+ ///
+ /// The catalog leaf.
+ /// True if the package is listed.
+ public static bool IsListed(this PackageDetailsCatalogLeaf leaf)
+ {
+ if (leaf.Listed.HasValue)
+ {
+ return leaf.Listed.Value;
+ }
+
+ // A published year of 1900 indicates that this package is unlisted, when the listed property itself is
+ // not present (legacy behavior).
+ return leaf.Published.Year != 1900;
+ }
+ }
+}
diff --git a/src/NuGet.Protocol.Catalog/Models/PackageDeleteCatalogLeaf.cs b/src/NuGet.Protocol.Catalog/Models/PackageDeleteCatalogLeaf.cs
new file mode 100644
index 000000000..de8c1fae8
--- /dev/null
+++ b/src/NuGet.Protocol.Catalog/Models/PackageDeleteCatalogLeaf.cs
@@ -0,0 +1,9 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+namespace NuGet.Protocol.Catalog
+{
+ public class PackageDeleteCatalogLeaf : CatalogLeaf
+ {
+ }
+}
diff --git a/src/NuGet.Protocol.Catalog/Models/PackageDependency.cs b/src/NuGet.Protocol.Catalog/Models/PackageDependency.cs
new file mode 100644
index 000000000..d2f91698a
--- /dev/null
+++ b/src/NuGet.Protocol.Catalog/Models/PackageDependency.cs
@@ -0,0 +1,16 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+using Newtonsoft.Json;
+
+namespace NuGet.Protocol.Catalog
+{
+ public class PackageDependency
+ {
+ [JsonProperty("id")]
+ public string Id { get; set; }
+
+ [JsonProperty("range")]
+ public string Range { get; set; }
+ }
+}
diff --git a/src/NuGet.Protocol.Catalog/Models/PackageDependencyGroup.cs b/src/NuGet.Protocol.Catalog/Models/PackageDependencyGroup.cs
new file mode 100644
index 000000000..da4608de5
--- /dev/null
+++ b/src/NuGet.Protocol.Catalog/Models/PackageDependencyGroup.cs
@@ -0,0 +1,17 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+using System.Collections.Generic;
+using Newtonsoft.Json;
+
+namespace NuGet.Protocol.Catalog
+{
+ public class PackageDependencyGroup
+ {
+ [JsonProperty("targetFramework")]
+ public string TargetFramework { get; set; }
+
+ [JsonProperty("dependencies")]
+ public List Dependencies { get; set; }
+ }
+}
diff --git a/src/NuGet.Protocol.Catalog/Models/PackageDetailsCatalogLeaf.cs b/src/NuGet.Protocol.Catalog/Models/PackageDetailsCatalogLeaf.cs
new file mode 100644
index 000000000..284fe004d
--- /dev/null
+++ b/src/NuGet.Protocol.Catalog/Models/PackageDetailsCatalogLeaf.cs
@@ -0,0 +1,75 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+using System;
+using System.Collections.Generic;
+using Newtonsoft.Json;
+
+namespace NuGet.Protocol.Catalog
+{
+ public class PackageDetailsCatalogLeaf : CatalogLeaf
+ {
+ [JsonProperty("authors")]
+ public string Authors { get; set; }
+
+ [JsonProperty("created")]
+ public DateTimeOffset Created { get; set; }
+
+ [JsonProperty("lastEdited")]
+ public DateTimeOffset LastEdited { get; set; }
+
+ [JsonProperty("dependencyGroups")]
+ public List DependencyGroups { get; set; }
+
+ [JsonProperty("description")]
+ public string Description { get; set; }
+
+ [JsonProperty("iconUrl")]
+ public string IconUrl { get; set; }
+
+ [JsonProperty("isPrerelease")]
+ public bool IsPrerelease { get; set; }
+
+ [JsonProperty("language")]
+ public string Language { get; set; }
+
+ [JsonProperty("licenseUrl")]
+ public string LicenseUrl { get; set; }
+
+ [JsonProperty("listed")]
+ public bool? Listed { get; set; }
+
+ [JsonProperty("minClientVersion")]
+ public string MinClientVersion { get; set; }
+
+ [JsonProperty("packageHash")]
+ public string PackageHash { get; set; }
+
+ [JsonProperty("packageHashAlgorithm")]
+ public string PackageHashAlgorithm { get; set; }
+
+ [JsonProperty("packageSize")]
+ public long PackageSize { get; set; }
+
+ [JsonProperty("projectUrl")]
+ public string ProjectUrl { get; set; }
+
+ [JsonProperty("releaseNotes")]
+ public string ReleaseNotes { get; set; }
+
+ [JsonProperty("requireLicenseAgreement")]
+ public bool? RequireLicenseAgreement { get; set; }
+
+ [JsonProperty("summary")]
+ public string Summary { get; set; }
+
+ [JsonProperty("tags")]
+ public List Tags { get; set; }
+
+ [JsonProperty("title")]
+ public string Title { get; set; }
+
+ [JsonProperty("verbatimVersion")]
+ public string VerbatimVersion { get; set; }
+ }
+}
diff --git a/src/NuGet.Protocol.Catalog/NuGet.Protocol.Catalog.csproj b/src/NuGet.Protocol.Catalog/NuGet.Protocol.Catalog.csproj
new file mode 100644
index 000000000..ece75944d
--- /dev/null
+++ b/src/NuGet.Protocol.Catalog/NuGet.Protocol.Catalog.csproj
@@ -0,0 +1,103 @@
+
+
+
+
+ Debug
+ AnyCPU
+ {D44C2E89-2D98-44BD-8712-8CCBE4E67C9C}
+ Library
+ Properties
+ NuGet.Protocol.Catalog
+ NuGet.Protocol.Catalog
+ v4.6.2
+ 512
+ true
+
+
+ true
+ full
+ false
+ bin\Debug\
+ DEBUG;TRACE
+ prompt
+ 4
+
+
+ pdbonly
+ true
+ bin\Release\
+ TRACE
+ prompt
+ 4
+
+
+ NuGet.Protocol.Catalog
+ A .NET library for consuming the NuGet API's catalog resource.
+ .NET Foundation
+ https://github.com/NuGet/NuGet.Services.Metadata/blob/master/LICENSE
+ https://github.com/NuGet/NuGet.Services.Metadata
+ git
+ https://github.com/NuGet/NuGet.Services.Metadata
+
+
+
+
+
+
+ 0.3.0
+ runtime; build; native; contentfiles; analyzers
+ all
+
+
+ 1.0.0
+
+
+ 9.0.1
+
+
+ 4.8.0
+ runtime; build; native; contentfiles; analyzers
+ all
+
+
+ 4.8.0
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ ..\..\build
+ $(BUILD_SOURCESDIRECTORY)\build
+ $(NuGetBuildPath)
+ none
+
+
+
+
+
\ No newline at end of file
diff --git a/src/NuGet.Protocol.Catalog/Properties/AssemblyInfo.cs b/src/NuGet.Protocol.Catalog/Properties/AssemblyInfo.cs
new file mode 100644
index 000000000..ace49ddfa
--- /dev/null
+++ b/src/NuGet.Protocol.Catalog/Properties/AssemblyInfo.cs
@@ -0,0 +1,16 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+using System.Reflection;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+
+[assembly: AssemblyTitle("NuGet.Protocol.Catalog")]
+[assembly: ComVisible(false)]
+[assembly: Guid("d44c2e89-2d98-44bd-8712-8ccbe4e67c9c")]
+
+#if SIGNED_BUILD
+[assembly: InternalsVisibleTo("NuGet.Protocol.Catalog.Tests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100b5fc90e7027f67871e773a8fde8938c81dd402ba65b9201d60593e96c492651e889cc13f1415ebb53fac1131ae0bd333c5ee6021672d9718ea31a8aebd0da0072f25d87dba6fc90ffd598ed4da35e44c398c454307e8e33b8426143daec9f596836f97c8f74750e5975c64e2189f45def46b2a2b1247adc3652bf5c308055da9")]
+#else
+[assembly: InternalsVisibleTo("NuGet.Protocol.Catalog.Tests")]
+#endif
diff --git a/src/NuGet.Protocol.Catalog/Serialization/BaseCatalogLeafConverter.cs b/src/NuGet.Protocol.Catalog/Serialization/BaseCatalogLeafConverter.cs
new file mode 100644
index 000000000..fa0fa8835
--- /dev/null
+++ b/src/NuGet.Protocol.Catalog/Serialization/BaseCatalogLeafConverter.cs
@@ -0,0 +1,35 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+using System;
+using System.Collections.Generic;
+using Newtonsoft.Json;
+
+namespace NuGet.Protocol.Catalog
+{
+ internal abstract class BaseCatalogLeafConverter : JsonConverter
+ {
+ private readonly IReadOnlyDictionary _fromType;
+
+ public BaseCatalogLeafConverter(IReadOnlyDictionary fromType)
+ {
+ _fromType = fromType;
+ }
+
+ public override bool CanConvert(Type objectType)
+ {
+ return objectType == typeof(CatalogLeafType);
+ }
+
+ public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer)
+ {
+ string output;
+ if (_fromType.TryGetValue((CatalogLeafType)value, out output))
+ {
+ writer.WriteValue(output);
+ }
+
+ throw new NotSupportedException($"The catalog leaf type '{value}' is not supported.");
+ }
+ }
+}
diff --git a/src/NuGet.Protocol.Catalog/Serialization/CatalogJsonSerialization.cs b/src/NuGet.Protocol.Catalog/Serialization/CatalogJsonSerialization.cs
new file mode 100644
index 000000000..6cd1f14a5
--- /dev/null
+++ b/src/NuGet.Protocol.Catalog/Serialization/CatalogJsonSerialization.cs
@@ -0,0 +1,19 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+using Newtonsoft.Json;
+
+namespace NuGet.Protocol.Catalog
+{
+ internal static class CatalogJsonSerialization
+ {
+ public static JsonSerializer Serializer => JsonSerializer.Create(Settings);
+
+ public static JsonSerializerSettings Settings => new JsonSerializerSettings
+ {
+ DateTimeZoneHandling = DateTimeZoneHandling.Utc,
+ DateParseHandling = DateParseHandling.DateTimeOffset,
+ NullValueHandling = NullValueHandling.Ignore,
+ };
+ }
+}
diff --git a/src/NuGet.Protocol.Catalog/Serialization/CatalogLeafItemTypeConverter.cs b/src/NuGet.Protocol.Catalog/Serialization/CatalogLeafItemTypeConverter.cs
new file mode 100644
index 000000000..85dac704c
--- /dev/null
+++ b/src/NuGet.Protocol.Catalog/Serialization/CatalogLeafItemTypeConverter.cs
@@ -0,0 +1,41 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using Newtonsoft.Json;
+
+namespace NuGet.Protocol.Catalog
+{
+ internal class CatalogLeafItemTypeConverter : BaseCatalogLeafConverter
+ {
+ private static readonly Dictionary FromType = new Dictionary
+ {
+ { CatalogLeafType.PackageDelete, "nuget:PackageDelete" },
+ { CatalogLeafType.PackageDetails, "nuget:PackageDetails" },
+ };
+
+ private static readonly Dictionary FromString = FromType
+ .ToDictionary(x => x.Value, x => x.Key);
+
+ public CatalogLeafItemTypeConverter() : base(FromType)
+ {
+ }
+
+ public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer)
+ {
+ string stringValue = reader.Value as string;
+ if (stringValue != null)
+ {
+ CatalogLeafType output;
+ if (FromString.TryGetValue(stringValue, out output))
+ {
+ return output;
+ }
+ }
+
+ throw new JsonSerializationException($"Unexpected value for a {nameof(CatalogLeafType)}.");
+ }
+ }
+}
diff --git a/src/NuGet.Protocol.Catalog/Serialization/CatalogLeafTypeConverter.cs b/src/NuGet.Protocol.Catalog/Serialization/CatalogLeafTypeConverter.cs
new file mode 100644
index 000000000..d8bbc576d
--- /dev/null
+++ b/src/NuGet.Protocol.Catalog/Serialization/CatalogLeafTypeConverter.cs
@@ -0,0 +1,50 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using Newtonsoft.Json;
+
+namespace NuGet.Protocol.Catalog
+{
+ internal class CatalogLeafTypeConverter : BaseCatalogLeafConverter
+ {
+ private static readonly Dictionary FromType = new Dictionary
+ {
+ { CatalogLeafType.PackageDelete, "PackageDelete" },
+ { CatalogLeafType.PackageDetails, "PackageDetails" },
+ };
+
+ private static readonly Dictionary FromString = FromType
+ .ToDictionary(x => x.Value, x => x.Key);
+
+ public CatalogLeafTypeConverter() : base(FromType)
+ {
+ }
+
+ public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer)
+ {
+ List