Skip to content
This repository has been archived by the owner on Jul 30, 2024. It is now read-only.
/ NuGet.Jobs Public archive

Commit

Permalink
Add untested code changes
Browse files Browse the repository at this point in the history
Update tests and config

Improving logging

Undo hack :'(

Update connection strings
  • Loading branch information
loic-sharma committed Nov 12, 2018
1 parent 763f5d0 commit e7367ba
Show file tree
Hide file tree
Showing 22 changed files with 367 additions and 248 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,13 @@ namespace NuGet.Services.Revalidate
public class RevalidationQueueConfiguration
{
/// <summary>
/// If non-null, this skips revalidations of packages with more than this many versions.
/// The maximum number of revalidations that should be returned by <see cref="IRevalidationQueue.NextAsync"/>.
/// </summary>
public int? MaximumPackageVersions { get; set; }
public int MaxBatchSize { get; set; } = 64;

/// <summary>
/// The maximum times that the <see cref="RevalidationQueue"/> should look for a revalidation
/// before giving up.
/// </summary>
public int MaximumAttempts { get; set; } = 5;

/// <summary>
/// The time to sleep after an initialized revalidation is deemed completed.
/// If non-null, this skips revalidations of packages with more than this many versions.
/// </summary>
public TimeSpan SleepBetweenAttempts { get; set; } = TimeSpan.FromSeconds(5);
public int? MaximumPackageVersions { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
<Compile Include="Initialization\PackageRevalidationInserter.cs" />
<Compile Include="Services\RevalidationOperation.cs" />
<Compile Include="Services\RevalidationQueue.cs" />
<Compile Include="Services\RevalidationResult.cs" />
<Compile Include="Services\StartRevalidationStatus.cs" />
<Compile Include="Services\RevalidationService.cs" />
<Compile Include="Services\RevalidationJobStateService.cs" />
<Compile Include="Services\PackageRevalidationStateService.cs" />
Expand All @@ -82,6 +82,7 @@
<Compile Include="Program.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Properties\AssemblyInfo.*.cs" />
<Compile Include="Services\StartRevalidationResult.cs" />
</ItemGroup>
<ItemGroup>
<None Include="App.config" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ public interface IPackageRevalidationStateService
Task<int> CountRevalidationsEnqueuedInPastHourAsync();

/// <summary>
/// Update the package revalidation and mark is as enqueued.
/// Update the package revalidations and mark them as enqueued.
/// </summary>
/// <param name="revalidation">The revalidation to update.</param>
/// <returns>A task that completes once the revalidation has been updated.</returns>
Task MarkPackageRevalidationAsEnqueuedAsync(PackageRevalidation revalidation);
/// <param name="revalidations">The revalidations to update.</param>
/// <returns>A task that completes once the revalidations have been updated.</returns>
Task MarkPackageRevalidationsAsEnqueuedAsync(IReadOnlyList<PackageRevalidation> revalidations);
}
}
7 changes: 4 additions & 3 deletions src/NuGet.Services.Revalidate/Services/IRevalidationQueue.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System.Collections.Generic;
using System.Threading.Tasks;
using NuGet.Services.Validation;

Expand All @@ -9,9 +10,9 @@ namespace NuGet.Services.Revalidate
public interface IRevalidationQueue
{
/// <summary>
/// Fetch the next package to revalidate.
/// Fetch the next packages to revalidate.
/// </summary>
/// <returns>The next package to revalidate, or null if there are no packages to revalidate at this time.</returns>
Task<PackageRevalidation> NextOrNullAsync();
/// <returns>The next packages to revalidate, or an empty list if there are no packages to revalidate at this time.</returns>
Task<IReadOnlyList<PackageRevalidation>> NextAsync();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@ public interface IRevalidationStarter
/// 4. A revalidation could not be found at this time
/// </summary>
/// <returns>The result of the revalidation attempt.</returns>
Task<RevalidationResult> StartNextRevalidationAsync();
Task<StartRevalidationResult> StartNextRevalidationsAsync();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ public interface IRevalidationThrottler
/// <summary>
/// Delay the current task to achieve the desired revalidation rate.
/// </summary>
/// <param name="started">The number of revalidations started.</param>
/// <returns>Delay the task to ensure the desired revalidation rate.</returns>
Task DelayUntilNextRevalidationAsync();
Task DelayUntilNextRevalidationAsync(int started);

/// <summary>
/// Delay the current task until when a revalidation can be retried.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,21 +67,20 @@ public async Task<int> CountRevalidationsEnqueuedInPastHourAsync()
.CountAsync();
}

public async Task MarkPackageRevalidationAsEnqueuedAsync(PackageRevalidation revalidation)
public async Task MarkPackageRevalidationsAsEnqueuedAsync(IReadOnlyList<PackageRevalidation> revalidations)
{
try
{
revalidation.Enqueued = DateTime.UtcNow;
foreach (var revalidation in revalidations)
{
revalidation.Enqueued = DateTime.UtcNow;
}

await _context.SaveChangesAsync();
}
catch (DbUpdateConcurrencyException)
{
_logger.LogWarning(
"Failed to update revalidation as enqueued for {PackageId} {PackageNormalizedVersion}",
revalidation.PackageId,
revalidation.PackageNormalizedVersion);

_logger.LogWarning("Failed to update revalidations as enqueued");
throw;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,13 @@ namespace NuGet.Services.Revalidate
public class StartNextRevalidationOperation
{
/// <summary>
/// The result of attempting to start the next revalidation.
/// The result of attempting to start the next revalidations.
/// </summary>
public RevalidationResult Result { get; set; }
public StartRevalidationStatus Result { get; set; }

/// <summary>
/// The number of revalidations started.
/// </summary>
public int Started { get; set; }
}
}
169 changes: 92 additions & 77 deletions src/NuGet.Services.Revalidate/Services/RevalidationQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Data.Entity;
using System.Data.Entity.Infrastructure;
using System.Linq;
Expand Down Expand Up @@ -37,106 +38,120 @@ public RevalidationQueue(
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

public async Task<PackageRevalidation> NextOrNullAsync()
public async Task<IReadOnlyList<PackageRevalidation>> NextAsync()
{
for (var i = 0; i < _config.MaximumAttempts; i++)
{
_logger.LogInformation(
"Attempting to find the next revalidation. Try {Attempt} of {MaxAttempts}",
i + 1,
_config.MaximumAttempts);

// Find the next package to revalidate. We will skip packages if:
// 1. The package has more than "MaximumPackageVersions" versions
// 2. The package has already been enqueued for revalidation
// 3. The package's revalidation was completed by an external factory (like manual admin revalidation)
IQueryable<PackageRevalidation> query = _validationContext.PackageRevalidations;

if (_config.MaximumPackageVersions.HasValue)
{
query = query.Where(
r =>
!_validationContext.PackageRevalidations.GroupBy(r2 => r2.PackageId)
.Where(g => g.Count() > _config.MaximumPackageVersions)
.Any(g => g.Key == r.PackageId));
}

var next = await query
.Where(r => r.Enqueued == null)
.Where(r => r.Completed == false)
.OrderBy(r => r.Key)
.FirstOrDefaultAsync();

if (next == null)
{
_logger.LogWarning("Could not find any incomplete revalidations");
return null;
}

// Don't revalidate packages that already have a repository signature or that no longer exist.
if (await HasRepositorySignature(next) || await IsDeleted(next))
{
await MarkAsCompleted(next);
await Task.Delay(_config.SleepBetweenAttempts);
// Find the next package to revalidate. We will skip packages if:
// 1. The package has more than "MaximumPackageVersions" versions
// 2. The package has already been enqueued for revalidation
// 3. The package's revalidation was completed by an external factory (like manual admin revalidation)
IQueryable<PackageRevalidation> query = _validationContext.PackageRevalidations;

continue;
}
if (_config.MaximumPackageVersions.HasValue)
{
query = query.Where(
r =>
!_validationContext.PackageRevalidations.GroupBy(r2 => r2.PackageId)
.Where(g => g.Count() > _config.MaximumPackageVersions)
.Any(g => g.Key == r.PackageId));
}

_logger.LogInformation(
"Found revalidation for {PackageId} {PackageNormalizedVersion} after {Attempt} attempts",
next.PackageId,
next.PackageNormalizedVersion,
i + 1);
var next = await query
.Where(r => r.Enqueued == null)
.Where(r => r.Completed == false)
.OrderBy(r => r.Key)
.Take(_config.MaxBatchSize)
.ToListAsync();

if (!next.Any())
{
_logger.LogWarning("Could not find any incomplete revalidations");
return next;
}

_logger.LogInformation(
"Did not find any revalidations after {MaxAttempts}. Retry later...",
_config.MaximumAttempts);

return null;
// Return all the revalidations that aren't already completed.
return await FilterCompletedRevalidationsAsync(next);
}

private Task<bool> HasRepositorySignature(PackageRevalidation revalidation)
private async Task<List<PackageRevalidation>> FilterCompletedRevalidationsAsync(IReadOnlyList<PackageRevalidation> revalidations)
{
return _validationContext.PackageSigningStates
.Where(s => s.PackageId == revalidation.PackageId)
.Where(s => s.PackageNormalizedVersion == revalidation.PackageNormalizedVersion)
// Split the list of revalidations by which ones have been completed.
var completed = new List<PackageRevalidation>();
var uncompleted = revalidations.ToDictionary(
r => Tuple.Create(r.PackageId, r.PackageNormalizedVersion),
r => r);

// Seperate out packages that already have a repository signature.
var hasRepositorySignatures = await _validationContext.PackageSigningStates
.Where(s => revalidations.Any(r => r.PackageId == s.PackageId && r.PackageNormalizedVersion == s.PackageNormalizedVersion))
.Where(s => s.PackageSignatures.Any(sig => sig.Type == PackageSignatureType.Repository))
.AnyAsync();
}
.Select(s => new { s.PackageId, s.PackageNormalizedVersion })
.ToListAsync();

private async Task<bool> IsDeleted(PackageRevalidation revalidation)
{
var packageStatus = await _galleryContext.Set<Package>()
.Where(p => p.PackageRegistration.Id == revalidation.PackageId)
.Where(p => p.NormalizedVersion == revalidation.PackageNormalizedVersion)
.Select(p => (PackageStatus?)p.PackageStatusKey)
.FirstOrDefaultAsync();
foreach (var package in hasRepositorySignatures)
{
var key = Tuple.Create(package.PackageId, package.PackageNormalizedVersion);

completed.Add(uncompleted[key]);
uncompleted.Remove(key);
}

return (packageStatus == null || packageStatus == PackageStatus.Deleted);
// Separate out packages that are no longer available. We consider that a revalidation
// is "completed" if a package no longer exists.
var packageStatuses = await _galleryContext.Set<Package>()
.Where(p => uncompleted.Any(r => r.Key.Item1 == p.PackageRegistration.Id && r.Key.Item2 == p.NormalizedVersion))
.ToDictionaryAsync(
p => Tuple.Create(p.PackageRegistration.Id, p.NormalizedVersion),
p => p.PackageStatusKey);

foreach (var key in uncompleted.Keys.ToList())
{
// Packages that are hard deleted won't have a status.
if (!packageStatuses.TryGetValue(key, out var status) || status == PackageStatus.Deleted)
{
completed.Add(uncompleted[key]);
uncompleted.Remove(key);
continue;
}
}

// Update revalidations that were determined to be completed and return the remaining revalidations.
await MarkRevalidationsAsCompletedAsync(completed);
return uncompleted.Values.ToList();
}

private async Task MarkAsCompleted(PackageRevalidation revalidation)
private async Task MarkRevalidationsAsCompletedAsync(IReadOnlyList<PackageRevalidation> revalidations)
{
_logger.LogInformation(
"Marking package revalidation as completed as it has a repository signature or is deleted for {PackageId} {PackageNormalizedVersion}",
revalidation.PackageId,
revalidation.PackageNormalizedVersion);

try
{
revalidation.Completed = true;
foreach (var revalidation in revalidations)
{
_logger.LogInformation(
"Marking package revalidation as completed as it has a repository signature or is deleted for {PackageId} {PackageNormalizedVersion}...",
revalidation.PackageId,
revalidation.PackageNormalizedVersion);

revalidation.Completed = true;
}

await _validationContext.SaveChangesAsync();

_telemetry.TrackPackageRevalidationMarkedAsCompleted(revalidation.PackageId, revalidation.PackageNormalizedVersion);
foreach (var revalidation in revalidations)
{
_logger.LogInformation(
"Marked package revalidation as completed as it has a repository signature or is deleted for {PackageId} {PackageNormalizedVersion}",
revalidation.PackageId,
revalidation.PackageNormalizedVersion);

_telemetry.TrackPackageRevalidationMarkedAsCompleted(revalidation.PackageId, revalidation.PackageNormalizedVersion);
}
}
catch (DbUpdateConcurrencyException)
catch (DbUpdateConcurrencyException e)
{
// Swallow concurrency exceptions. The package will be marked as completed
// on the next iteration of "NextOrNullAsync".
_logger.LogError(
0,
e,
"Failed to mark package revalidations as completed. " +
$"These revalidations will be marked as completed on the next iteration of {nameof(NextAsync)}...");
}
}
}
Expand Down
21 changes: 11 additions & 10 deletions src/NuGet.Services.Revalidate/Services/RevalidationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,23 +49,23 @@ public async Task RunAsync()
{
_logger.LogInformation("Starting next revalidation...");

var result = await StartNextRevalidationAsync();
var result = await StartNextRevalidationsAsync();

switch (result)
switch (result.Status)
{
case RevalidationResult.RevalidationEnqueued:
_logger.LogInformation("Successfully enqueued revalidation");
case StartRevalidationStatus.RevalidationsEnqueued:
_logger.LogInformation("Successfully enqueued revalidations");

await _throttler.DelayUntilNextRevalidationAsync();
await _throttler.DelayUntilNextRevalidationAsync(result.RevalidationsStarted);
break;

case RevalidationResult.RetryLater:
case StartRevalidationStatus.RetryLater:
_logger.LogInformation("Could not start revalidation, retrying later");

await _throttler.DelayUntilRevalidationRetryAsync();
break;

case RevalidationResult.UnrecoverableError:
case StartRevalidationStatus.UnrecoverableError:
default:
_logger.LogCritical(
"Stopping revalidations due to unrecoverable or unknown result {Result}",
Expand All @@ -79,15 +79,16 @@ public async Task RunAsync()
_logger.LogInformation("Finished running after {ElapsedTime}", runTime.Elapsed);
}

private async Task<RevalidationResult> StartNextRevalidationAsync()
private async Task<StartRevalidationResult> StartNextRevalidationsAsync()
{
using (var operation = _telemetryService.TrackStartNextRevalidationOperation())
using (var scope = _scopeFactory.CreateScope())
{
var starter = scope.ServiceProvider.GetRequiredService<IRevalidationStarter>();
var result = await starter.StartNextRevalidationAsync();
var result = await starter.StartNextRevalidationsAsync();

operation.Properties.Result = result;
operation.Properties.Result = result.Status;
operation.Properties.Started = result.RevalidationsStarted;

return result;
}
Expand Down
Loading

0 comments on commit e7367ba

Please sign in to comment.