Skip to content
This repository has been archived by the owner on Mar 16, 2021. It is now read-only.

Allow storage different from CDN to be tested by RegistrationComparer #749

Merged
merged 1 commit into from
Feb 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/NuGet.Jobs.RegistrationComparer/CursorUtility.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public static Dictionary<string, ReadCursor> GetRegistrationCursors(
var hiveCursors = new Dictionary<string, ReadCursor>();
foreach (var hives in options.Value.Registrations)
{
var cursorUrl = new Uri(hives.LegacyBaseUrl.TrimEnd('/') + "/cursor.json");
var cursorUrl = new Uri(hives.Legacy.StorageBaseUrl.TrimEnd('/') + "/cursor.json");
hiveCursors.Add(cursorUrl.AbsoluteUri, new HttpReadCursor(cursorUrl, DateTime.MinValue, handlerFunc));
}

Expand Down
122 changes: 73 additions & 49 deletions src/NuGet.Jobs.RegistrationComparer/HiveComparer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
Expand All @@ -21,6 +23,8 @@ namespace NuGet.Jobs.RegistrationComparer
{
public class HiveComparer
{
private static long _requestId = 0;

private readonly HttpClient _httpClient;
private readonly JsonComparer _comparer;
private readonly IOptionsSnapshot<RegistrationComparerConfiguration> _options;
Expand All @@ -39,19 +43,19 @@ public HiveComparer(
}

public async Task CompareAsync(
IReadOnlyList<string> baseUrls,
IReadOnlyList<HiveConfiguration> hives,
string id,
IReadOnlyList<string> versions)
{
if (baseUrls.Count <= 1)
if (hives.Count <= 1)
{
throw new ArgumentException("At least two base URLs must be provided.", nameof(baseUrls));
throw new ArgumentException("At least two hive configurations must be provided.", nameof(hives));
}

// Compare the indexes.
var rawIndexes = await Task.WhenAll(baseUrls.Select(x => GetIndexAsync(x, id)));
var rawIndexes = await Task.WhenAll(hives.Select(x => GetIndexAsync(x, id)));
var areBothMissing = false;
for (var i = 1; i < baseUrls.Count; i++)
for (var i = 1; i < hives.Count; i++)
{
if (AreBothMissing(
rawIndexes[i - 1].Url,
Expand All @@ -65,8 +69,8 @@ public async Task CompareAsync(

var comparisonContext = new ComparisonContext(
id,
baseUrls[i - 1],
baseUrls[i],
hives[i - 1].BaseUrl,
hives[i].BaseUrl,
rawIndexes[i - 1].Url,
rawIndexes[i].Url,
Normalizers.Index);
Expand All @@ -87,7 +91,9 @@ public async Task CompareAsync(
foreach (var rawIndex in rawIndexes)
{
indexes.Add(new DownloadedData<RegistrationIndex>(
rawIndex.Hive,
rawIndex.Url,
rawIndex.StorageUrl,
rawIndex.Data.ToObject<RegistrationIndex>(NuGetJsonSerialization.Serializer)));
}

Expand All @@ -97,67 +103,71 @@ public async Task CompareAsync(
.Data
.Items
.Where(p => p.Items == null)
.Select(p => p.Url)
.Select(p => new { x.Hive, p.Url })
.ToList())
.ToList();
var leafUrlGroups = baseUrls
.Select(x => versions.Select(v => $"{x}{id}/{v}.json").ToList())
var leafUrlGroups = hives
.Select(x => versions
.Select(v => $"{x.BaseUrl}{id}/{v}.json")
.Select(u => new { Hive = x, Url = u })
.ToList())
.ToList();

var urls = new ConcurrentBag<string>(pageUrlGroups
var pairs = new ConcurrentBag<KeyValuePair<string, HiveConfiguration>>(pageUrlGroups
.SelectMany(x => x)
.Concat(leafUrlGroups.SelectMany(x => x)));
.Concat(leafUrlGroups.SelectMany(x => x))
.Select(x => new KeyValuePair<string, HiveConfiguration>(x.Url, x.Hive)));
Copy link
Contributor

@agr agr Feb 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KeyValuePair.Create(x.Url, x.Hive) would be a bit more compact.
What is Url here, by the way? Is it just to be able to throw exception in GetJObjectOrNullAsync? If yes, given that it is a private method, can we validate data earlier?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the official (not blob storage) URL of a registration leaf that needs to be downloaded and validated.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The URL is there so we can parallelize the downloads of many leaves and pages.

I'll use the KeyValuePair.Create trick.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

var urlToJson = new ConcurrentDictionary<string, JObject>();
await ParallelAsync.Repeat(
async () =>
{
await Task.Yield();
while (urls.TryTake(out var pageUrl))
while (pairs.TryTake(out var pair))
{
var json = await GetJObjectOrNullAsync(pageUrl);
urlToJson.TryAdd(pageUrl, json.Data);
var data = await GetJObjectOrNullAsync(pair.Value, pair.Key);
urlToJson.TryAdd(pair.Key, data.Data);
}
},
_options.Value.MaxConcurrentPageAndLeafDownloadsPerId);

// Compare the pages.
for (var i = 1; i < baseUrls.Count; i++)
for (var i = 1; i < hives.Count; i++)
{
for (var pageIndex = 0; pageIndex < pageUrlGroups[i].Count; pageIndex++)
{
var leftUrl = pageUrlGroups[i - 1][pageIndex];
var rightUrl = pageUrlGroups[i][pageIndex];
var left = pageUrlGroups[i - 1][pageIndex];
var right = pageUrlGroups[i][pageIndex];

var comparisonContext = new ComparisonContext(
id,
baseUrls[i - 1],
baseUrls[i],
leftUrl,
rightUrl,
hives[i - 1].BaseUrl,
hives[i].BaseUrl,
left.Url,
right.Url,
Normalizers.Page);

_comparer.Compare(
urlToJson[leftUrl],
urlToJson[rightUrl],
urlToJson[left.Url],
urlToJson[right.Url],
comparisonContext);
}
}

// Compare the affected leaves.
for (var i = 1; i < baseUrls.Count; i++)
for (var i = 1; i < hives.Count; i++)
{
for (var leafIndex = 0; leafIndex < leafUrlGroups[i].Count; leafIndex++)
{
var leftUrl = leafUrlGroups[i - 1][leafIndex];
var rightUrl = leafUrlGroups[i][leafIndex];
var left = leafUrlGroups[i - 1][leafIndex];
var right = leafUrlGroups[i][leafIndex];

try
{
if (AreBothMissing(
leftUrl,
rightUrl,
urlToJson[leftUrl],
urlToJson[rightUrl]))
left.Url,
right.Url,
urlToJson[left.Url],
urlToJson[right.Url]))
{
continue;
}
Expand All @@ -170,15 +180,15 @@ await ParallelAsync.Repeat(

var comparisonContext = new ComparisonContext(
id,
baseUrls[i - 1],
baseUrls[i],
leftUrl,
rightUrl,
hives[i - 1].BaseUrl,
hives[i].BaseUrl,
left.Url,
right.Url,
Normalizers.Leaf);

_comparer.Compare(
urlToJson[leftUrl],
urlToJson[rightUrl],
urlToJson[left.Url],
urlToJson[right.Url],
comparisonContext);
}
}
Expand All @@ -199,50 +209,64 @@ private bool AreBothMissing(string leftUrl, string rightUrl, JObject left, JObje
return left == null;
}

private async Task<DownloadedData<JObject>> GetIndexAsync(string baseUrl, string id)
private async Task<DownloadedData<JObject>> GetIndexAsync(HiveConfiguration hive, string id)
{
var url = $"{baseUrl}{id}/index.json";
return await GetJObjectOrNullAsync(url);
var url = $"{hive.BaseUrl}{id}/index.json";
return await GetJObjectOrNullAsync(hive, url);
}

private async Task<DownloadedData<JObject>> GetJObjectOrNullAsync(string url)
private async Task<DownloadedData<JObject>> GetJObjectOrNullAsync(HiveConfiguration hive, string url)
{
using (var response = await _httpClient.GetAsync(url))
if (!url.StartsWith(hive.BaseUrl))
{
throw new ArgumentException("The provided URL must start with the hive base URL.");
}

var storageUrl = hive.StorageBaseUrl + url.Substring(hive.BaseUrl.Length);
var requestId = Interlocked.Increment(ref _requestId);
_logger.LogInformation("[Request {RequestId}] Fetching {Url}", requestId, storageUrl);
var stopwatch = Stopwatch.StartNew();
using (var response = await _httpClient.GetAsync(storageUrl, HttpCompletionOption.ResponseContentRead))
{
_logger.LogInformation(
"Fetched {Url}: {StatusCode} {ReasonPhrase}",
url,
"[Request {RequestId}] Got {StatusCode} {ReasonPhrase} after {DurationMs}ms",
requestId,
(int)response.StatusCode,
response.ReasonPhrase);
response.ReasonPhrase,
(int)stopwatch.Elapsed.TotalMilliseconds);

if (response.StatusCode == HttpStatusCode.NotFound)
{
return new DownloadedData<JObject>(url, null);
return new DownloadedData<JObject>(hive, url, storageUrl, null);
}

response.EnsureSuccessStatusCode();

using (var stream = await _httpClient.GetStreamAsync(url))
using (var stream = await response.Content.ReadAsStreamAsync())
using (var streamReader = new StreamReader(stream))
using (var jsonTextReader = new JsonTextReader(streamReader))
{
jsonTextReader.DateParseHandling = DateParseHandling.None;

var data = JObject.Load(jsonTextReader);
return new DownloadedData<JObject>(url, data);
return new DownloadedData<JObject>(hive, url, storageUrl, data);
}
}
}

private class DownloadedData<T>
{
public DownloadedData(string url, T data)
public DownloadedData(HiveConfiguration hive, string url, string storageUrl, T data)
{
Hive = hive;
Url = url;
StorageUrl = storageUrl;
Data = data;
}

public HiveConfiguration Hive { get; }
public string Url { get; }
public string StorageUrl { get; }
public T Data { get; }
}
}
Expand Down
11 changes: 11 additions & 0 deletions src/NuGet.Jobs.RegistrationComparer/HiveConfiguration.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

namespace NuGet.Jobs.RegistrationComparer
{
public class HiveConfiguration
{
public string StorageBaseUrl { get; set; }
public string BaseUrl { get; set; }
}
}
6 changes: 3 additions & 3 deletions src/NuGet.Jobs.RegistrationComparer/HivesConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ namespace NuGet.Jobs.RegistrationComparer
{
public class HivesConfiguration
{
public string LegacyBaseUrl { get; set; }
public string GzippedBaseUrl { get; set; }
public string SemVer2BaseUrl { get; set; }
public HiveConfiguration Legacy { get; set; }
public HiveConfiguration Gzipped { get; set; }
public HiveConfiguration SemVer2 { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
<ItemGroup>
<Compile Include="ComparisonContext.cs" />
<Compile Include="CursorUtility.cs" />
<Compile Include="HiveConfiguration.cs" />
<Compile Include="Normalizers.cs" />
<Compile Include="RegistrationComparerCollectorLogic.cs" />
<Compile Include="HiveComparer.cs" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,19 +62,34 @@ public async Task OnProcessBatchAsync(IEnumerable<CatalogCommitItem> items)
.Registrations
.SelectMany(x => new[]
{
new { Hive = nameof(x.LegacyBaseUrl), Url = x.LegacyBaseUrl.TrimEnd('/') + '/' },
new { Hive = nameof(x.GzippedBaseUrl), Url = x.GzippedBaseUrl.TrimEnd('/') + '/' },
new { Hive = nameof(x.SemVer2BaseUrl), Url = x.SemVer2BaseUrl.TrimEnd('/') + '/' },
new
{
Hive = nameof(x.Legacy),
BaseUrl = x.Legacy.BaseUrl.TrimEnd('/') + '/',
StorageBaseUrl = x.Legacy.StorageBaseUrl.TrimEnd('/') + '/',
},
new
{
Hive = nameof(x.Gzipped),
BaseUrl = x.Gzipped.BaseUrl.TrimEnd('/') + '/',
StorageBaseUrl = x.Gzipped.StorageBaseUrl.TrimEnd('/') + '/',
},
new
{
Hive = nameof(x.SemVer2),
BaseUrl = x.SemVer2.BaseUrl.TrimEnd('/') + '/',
StorageBaseUrl = x.SemVer2.StorageBaseUrl.TrimEnd('/') + '/'
},
})
.GroupBy(x => x.Hive, x => x.Url);
.GroupBy(x => x.Hive, x => new HiveConfiguration { BaseUrl = x.BaseUrl, StorageBaseUrl = x.StorageBaseUrl });

var allWork = new ConcurrentBag<Func<Task>>();
var failures = 0;
foreach (var group in packageIdGroups)
{
foreach (var hiveGroup in hiveGroups)
{
var baseUrls = hiveGroup.ToList();
var hives = hiveGroup.ToList();
var hive = hiveGroup.Key;
var id = group.Id;
var versions = group.Versions;
Expand All @@ -83,7 +98,7 @@ public async Task OnProcessBatchAsync(IEnumerable<CatalogCommitItem> items)
_logger.LogInformation("Verifying hive {Hive} for {PackageId}.", hive, id);
try
{
await _comparer.CompareAsync(baseUrls, id, versions);
await _comparer.CompareAsync(hives, id, versions);
}
catch (Exception ex)
{
Expand Down