Skip to content
This repository has been archived by the owner on Mar 16, 2021. It is now read-only.

Commit

Permalink
Allow storage different from CDN to be tested by RegistrationComparer (
Browse files Browse the repository at this point in the history
  • Loading branch information
joelverhagen committed Feb 24, 2020
1 parent 4b78882 commit b39a78c
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 59 deletions.
2 changes: 1 addition & 1 deletion src/NuGet.Jobs.RegistrationComparer/CursorUtility.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public static Dictionary<string, ReadCursor> GetRegistrationCursors(
var hiveCursors = new Dictionary<string, ReadCursor>();
foreach (var hives in options.Value.Registrations)
{
var cursorUrl = new Uri(hives.LegacyBaseUrl.TrimEnd('/') + "/cursor.json");
var cursorUrl = new Uri(hives.Legacy.StorageBaseUrl.TrimEnd('/') + "/cursor.json");
hiveCursors.Add(cursorUrl.AbsoluteUri, new HttpReadCursor(cursorUrl, DateTime.MinValue, handlerFunc));
}

Expand Down
122 changes: 73 additions & 49 deletions src/NuGet.Jobs.RegistrationComparer/HiveComparer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
Expand All @@ -21,6 +23,8 @@ namespace NuGet.Jobs.RegistrationComparer
{
public class HiveComparer
{
private static long _requestId = 0;

private readonly HttpClient _httpClient;
private readonly JsonComparer _comparer;
private readonly IOptionsSnapshot<RegistrationComparerConfiguration> _options;
Expand All @@ -39,19 +43,19 @@ public HiveComparer(
}

public async Task CompareAsync(
IReadOnlyList<string> baseUrls,
IReadOnlyList<HiveConfiguration> hives,
string id,
IReadOnlyList<string> versions)
{
if (baseUrls.Count <= 1)
if (hives.Count <= 1)
{
throw new ArgumentException("At least two base URLs must be provided.", nameof(baseUrls));
throw new ArgumentException("At least two hive configurations must be provided.", nameof(hives));
}

// Compare the indexes.
var rawIndexes = await Task.WhenAll(baseUrls.Select(x => GetIndexAsync(x, id)));
var rawIndexes = await Task.WhenAll(hives.Select(x => GetIndexAsync(x, id)));
var areBothMissing = false;
for (var i = 1; i < baseUrls.Count; i++)
for (var i = 1; i < hives.Count; i++)
{
if (AreBothMissing(
rawIndexes[i - 1].Url,
Expand All @@ -65,8 +69,8 @@ public async Task CompareAsync(

var comparisonContext = new ComparisonContext(
id,
baseUrls[i - 1],
baseUrls[i],
hives[i - 1].BaseUrl,
hives[i].BaseUrl,
rawIndexes[i - 1].Url,
rawIndexes[i].Url,
Normalizers.Index);
Expand All @@ -87,7 +91,9 @@ public async Task CompareAsync(
foreach (var rawIndex in rawIndexes)
{
indexes.Add(new DownloadedData<RegistrationIndex>(
rawIndex.Hive,
rawIndex.Url,
rawIndex.StorageUrl,
rawIndex.Data.ToObject<RegistrationIndex>(NuGetJsonSerialization.Serializer)));
}

Expand All @@ -97,67 +103,71 @@ public async Task CompareAsync(
.Data
.Items
.Where(p => p.Items == null)
.Select(p => p.Url)
.Select(p => new { x.Hive, p.Url })
.ToList())
.ToList();
var leafUrlGroups = baseUrls
.Select(x => versions.Select(v => $"{x}{id}/{v}.json").ToList())
var leafUrlGroups = hives
.Select(x => versions
.Select(v => $"{x.BaseUrl}{id}/{v}.json")
.Select(u => new { Hive = x, Url = u })
.ToList())
.ToList();

var urls = new ConcurrentBag<string>(pageUrlGroups
var pairs = new ConcurrentBag<KeyValuePair<string, HiveConfiguration>>(pageUrlGroups
.SelectMany(x => x)
.Concat(leafUrlGroups.SelectMany(x => x)));
.Concat(leafUrlGroups.SelectMany(x => x))
.Select(x => new KeyValuePair<string, HiveConfiguration>(x.Url, x.Hive)));
var urlToJson = new ConcurrentDictionary<string, JObject>();
await ParallelAsync.Repeat(
async () =>
{
await Task.Yield();
while (urls.TryTake(out var pageUrl))
while (pairs.TryTake(out var pair))
{
var json = await GetJObjectOrNullAsync(pageUrl);
urlToJson.TryAdd(pageUrl, json.Data);
var data = await GetJObjectOrNullAsync(pair.Value, pair.Key);
urlToJson.TryAdd(pair.Key, data.Data);
}
},
_options.Value.MaxConcurrentPageAndLeafDownloadsPerId);

// Compare the pages.
for (var i = 1; i < baseUrls.Count; i++)
for (var i = 1; i < hives.Count; i++)
{
for (var pageIndex = 0; pageIndex < pageUrlGroups[i].Count; pageIndex++)
{
var leftUrl = pageUrlGroups[i - 1][pageIndex];
var rightUrl = pageUrlGroups[i][pageIndex];
var left = pageUrlGroups[i - 1][pageIndex];
var right = pageUrlGroups[i][pageIndex];

var comparisonContext = new ComparisonContext(
id,
baseUrls[i - 1],
baseUrls[i],
leftUrl,
rightUrl,
hives[i - 1].BaseUrl,
hives[i].BaseUrl,
left.Url,
right.Url,
Normalizers.Page);

_comparer.Compare(
urlToJson[leftUrl],
urlToJson[rightUrl],
urlToJson[left.Url],
urlToJson[right.Url],
comparisonContext);
}
}

// Compare the affected leaves.
for (var i = 1; i < baseUrls.Count; i++)
for (var i = 1; i < hives.Count; i++)
{
for (var leafIndex = 0; leafIndex < leafUrlGroups[i].Count; leafIndex++)
{
var leftUrl = leafUrlGroups[i - 1][leafIndex];
var rightUrl = leafUrlGroups[i][leafIndex];
var left = leafUrlGroups[i - 1][leafIndex];
var right = leafUrlGroups[i][leafIndex];

try
{
if (AreBothMissing(
leftUrl,
rightUrl,
urlToJson[leftUrl],
urlToJson[rightUrl]))
left.Url,
right.Url,
urlToJson[left.Url],
urlToJson[right.Url]))
{
continue;
}
Expand All @@ -170,15 +180,15 @@ await ParallelAsync.Repeat(

var comparisonContext = new ComparisonContext(
id,
baseUrls[i - 1],
baseUrls[i],
leftUrl,
rightUrl,
hives[i - 1].BaseUrl,
hives[i].BaseUrl,
left.Url,
right.Url,
Normalizers.Leaf);

_comparer.Compare(
urlToJson[leftUrl],
urlToJson[rightUrl],
urlToJson[left.Url],
urlToJson[right.Url],
comparisonContext);
}
}
Expand All @@ -199,50 +209,64 @@ private bool AreBothMissing(string leftUrl, string rightUrl, JObject left, JObje
return left == null;
}

private async Task<DownloadedData<JObject>> GetIndexAsync(string baseUrl, string id)
private async Task<DownloadedData<JObject>> GetIndexAsync(HiveConfiguration hive, string id)
{
var url = $"{baseUrl}{id}/index.json";
return await GetJObjectOrNullAsync(url);
var url = $"{hive.BaseUrl}{id}/index.json";
return await GetJObjectOrNullAsync(hive, url);
}

private async Task<DownloadedData<JObject>> GetJObjectOrNullAsync(string url)
private async Task<DownloadedData<JObject>> GetJObjectOrNullAsync(HiveConfiguration hive, string url)
{
using (var response = await _httpClient.GetAsync(url))
if (!url.StartsWith(hive.BaseUrl))
{
throw new ArgumentException("The provided URL must start with the hive base URL.");
}

var storageUrl = hive.StorageBaseUrl + url.Substring(hive.BaseUrl.Length);
var requestId = Interlocked.Increment(ref _requestId);
_logger.LogInformation("[Request {RequestId}] Fetching {Url}", requestId, storageUrl);
var stopwatch = Stopwatch.StartNew();
using (var response = await _httpClient.GetAsync(storageUrl, HttpCompletionOption.ResponseContentRead))
{
_logger.LogInformation(
"Fetched {Url}: {StatusCode} {ReasonPhrase}",
url,
"[Request {RequestId}] Got {StatusCode} {ReasonPhrase} after {DurationMs}ms",
requestId,
(int)response.StatusCode,
response.ReasonPhrase);
response.ReasonPhrase,
(int)stopwatch.Elapsed.TotalMilliseconds);

if (response.StatusCode == HttpStatusCode.NotFound)
{
return new DownloadedData<JObject>(url, null);
return new DownloadedData<JObject>(hive, url, storageUrl, null);
}

response.EnsureSuccessStatusCode();

using (var stream = await _httpClient.GetStreamAsync(url))
using (var stream = await response.Content.ReadAsStreamAsync())
using (var streamReader = new StreamReader(stream))
using (var jsonTextReader = new JsonTextReader(streamReader))
{
jsonTextReader.DateParseHandling = DateParseHandling.None;

var data = JObject.Load(jsonTextReader);
return new DownloadedData<JObject>(url, data);
return new DownloadedData<JObject>(hive, url, storageUrl, data);
}
}
}

private class DownloadedData<T>
{
public DownloadedData(string url, T data)
public DownloadedData(HiveConfiguration hive, string url, string storageUrl, T data)
{
Hive = hive;
Url = url;
StorageUrl = storageUrl;
Data = data;
}

public HiveConfiguration Hive { get; }
public string Url { get; }
public string StorageUrl { get; }
public T Data { get; }
}
}
Expand Down
11 changes: 11 additions & 0 deletions src/NuGet.Jobs.RegistrationComparer/HiveConfiguration.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

namespace NuGet.Jobs.RegistrationComparer
{
public class HiveConfiguration
{
public string StorageBaseUrl { get; set; }
public string BaseUrl { get; set; }
}
}
6 changes: 3 additions & 3 deletions src/NuGet.Jobs.RegistrationComparer/HivesConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ namespace NuGet.Jobs.RegistrationComparer
{
public class HivesConfiguration
{
public string LegacyBaseUrl { get; set; }
public string GzippedBaseUrl { get; set; }
public string SemVer2BaseUrl { get; set; }
public HiveConfiguration Legacy { get; set; }
public HiveConfiguration Gzipped { get; set; }
public HiveConfiguration SemVer2 { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
<ItemGroup>
<Compile Include="ComparisonContext.cs" />
<Compile Include="CursorUtility.cs" />
<Compile Include="HiveConfiguration.cs" />
<Compile Include="Normalizers.cs" />
<Compile Include="RegistrationComparerCollectorLogic.cs" />
<Compile Include="HiveComparer.cs" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,19 +62,34 @@ public async Task OnProcessBatchAsync(IEnumerable<CatalogCommitItem> items)
.Registrations
.SelectMany(x => new[]
{
new { Hive = nameof(x.LegacyBaseUrl), Url = x.LegacyBaseUrl.TrimEnd('/') + '/' },
new { Hive = nameof(x.GzippedBaseUrl), Url = x.GzippedBaseUrl.TrimEnd('/') + '/' },
new { Hive = nameof(x.SemVer2BaseUrl), Url = x.SemVer2BaseUrl.TrimEnd('/') + '/' },
new
{
Hive = nameof(x.Legacy),
BaseUrl = x.Legacy.BaseUrl.TrimEnd('/') + '/',
StorageBaseUrl = x.Legacy.StorageBaseUrl.TrimEnd('/') + '/',
},
new
{
Hive = nameof(x.Gzipped),
BaseUrl = x.Gzipped.BaseUrl.TrimEnd('/') + '/',
StorageBaseUrl = x.Gzipped.StorageBaseUrl.TrimEnd('/') + '/',
},
new
{
Hive = nameof(x.SemVer2),
BaseUrl = x.SemVer2.BaseUrl.TrimEnd('/') + '/',
StorageBaseUrl = x.SemVer2.StorageBaseUrl.TrimEnd('/') + '/'
},
})
.GroupBy(x => x.Hive, x => x.Url);
.GroupBy(x => x.Hive, x => new HiveConfiguration { BaseUrl = x.BaseUrl, StorageBaseUrl = x.StorageBaseUrl });

var allWork = new ConcurrentBag<Func<Task>>();
var failures = 0;
foreach (var group in packageIdGroups)
{
foreach (var hiveGroup in hiveGroups)
{
var baseUrls = hiveGroup.ToList();
var hives = hiveGroup.ToList();
var hive = hiveGroup.Key;
var id = group.Id;
var versions = group.Versions;
Expand All @@ -83,7 +98,7 @@ public async Task OnProcessBatchAsync(IEnumerable<CatalogCommitItem> items)
_logger.LogInformation("Verifying hive {Hive} for {PackageId}.", hive, id);
try
{
await _comparer.CompareAsync(baseUrls, id, versions);
await _comparer.CompareAsync(hives, id, versions);
}
catch (Exception ex)
{
Expand Down

0 comments on commit b39a78c

Please sign in to comment.