Skip to content
This repository has been archived by the owner on Jul 30, 2024. It is now read-only.
/ NuGet.Jobs Public archive

Commit

Permalink
Handle "check validator" message in orchestrator (#779)
Browse files Browse the repository at this point in the history
  • Loading branch information
joelverhagen committed Jul 25, 2019
1 parent c206163 commit ca5b9aa
Show file tree
Hide file tree
Showing 26 changed files with 815 additions and 181 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@
<Version>5.8.4</Version>
</PackageReference>
<PackageReference Include="NuGet.Services.Storage">
<Version>2.52.0</Version>
<Version>2.53.0</Version>
</PackageReference>
</ItemGroup>
<PropertyGroup>
Expand Down
10 changes: 5 additions & 5 deletions src/NuGet.Jobs.Common/NuGet.Jobs.Common.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -107,19 +107,19 @@
<PrivateAssets>all</PrivateAssets>
</PackageReference>
<PackageReference Include="NuGet.Services.Configuration">
<Version>2.52.0</Version>
<Version>2.53.0</Version>
</PackageReference>
<PackageReference Include="NuGet.Services.Logging">
<Version>2.52.0</Version>
<Version>2.53.0</Version>
</PackageReference>
<PackageReference Include="NuGet.Services.Messaging.Email">
<Version>2.52.0</Version>
<Version>2.53.0</Version>
</PackageReference>
<PackageReference Include="NuGet.Services.Sql">
<Version>2.52.0</Version>
<Version>2.53.0</Version>
</PackageReference>
<PackageReference Include="NuGet.Services.Status">
<Version>2.52.0</Version>
<Version>2.53.0</Version>
</PackageReference>
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@
<PrivateAssets>all</PrivateAssets>
</PackageReference>
<PackageReference Include="NuGet.Services.Status">
<Version>2.52.0</Version>
<Version>2.53.0</Version>
</PackageReference>
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
Expand Down
6 changes: 4 additions & 2 deletions src/NuGet.Services.Revalidate/Services/RevalidationStarter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,12 @@ private async Task<StartRevalidationResult> StartRevalidationsAsync(IReadOnlyLis
revalidation.PackageId,
revalidation.PackageNormalizedVersion);

var message = new PackageValidationMessageData(
var message = PackageValidationMessageData.NewProcessValidationSet(
revalidation.PackageId,
revalidation.PackageNormalizedVersion,
revalidation.ValidationTrackingId.Value);
revalidation.ValidationTrackingId.Value,
ValidatingType.Package,
entityKey: null);

await _validationEnqueuer.StartValidationAsync(message);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@ public interface IValidationOutcomeProcessor<T> where T : class, IEntity
/// <param name="validatingEntity">The validating entity.</param>
/// <param name="currentCallStats">Contains information about what happened during current message processing in
/// the validation set processor.</param>
/// <param name="scheduleNextCheck">Whether or not the next check should be scheduled.</param>
/// <returns>A task that completes when the outcome has been processed</returns>
Task ProcessValidationOutcomeAsync(PackageValidationSet validationSet, IValidatingEntity<T> validatingEntity, ValidationSetProcessorResult currentCallStats);
Task ProcessValidationOutcomeAsync(
PackageValidationSet validationSet,
IValidatingEntity<T> validatingEntity,
ValidationSetProcessorResult currentCallStats,
bool scheduleNextCheck);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Threading.Tasks;
using NuGet.Services.Entities;

Expand All @@ -20,6 +21,14 @@ public interface IValidationSetProvider<T> where T : class, IEntity
/// requested <paramref name="validationTrackingId"/>. Null if no further processing
/// should be made (e.g. duplicate validation request was detected).
/// </returns>
Task<PackageValidationSet> TryGetOrCreateValidationSetAsync(PackageValidationMessageData message, IValidatingEntity<T> validatingEntity);
Task<PackageValidationSet> TryGetOrCreateValidationSetAsync(ProcessValidationSetData message, IValidatingEntity<T> validatingEntity);

/// <summary>
/// Reads a validation set from storage based given and ID of one of the validations in the set. If no such
/// validation exists, null is returned.
/// </summary>
/// <param name="validationId">The validation ID.</param>
/// <returns>The validation set, or null.</returns>
Task<PackageValidationSet> TryGetParentValidationSetAsync(Guid validationId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ public interface IValidationStorageService
/// <returns>Validation set instance if found, null otherwise.</returns>
Task<PackageValidationSet> GetValidationSetAsync(Guid validationTrackingId);

/// <summary>
/// Reads a validation set from storage based given and ID of one of the validations in the set. If no such
/// validation exists, null is returned.
/// </summary>
/// <param name="validationId">The validation ID.</param>
/// <returns>The validation set, or null.</returns>
Task<PackageValidationSet> TryGetParentValidationSetAsync(Guid validationId);

/// <summary>
/// Gets the number of validation sets that the provided entity has.
/// </summary>
Expand Down
3 changes: 1 addition & 2 deletions src/NuGet.Services.Validation.Orchestrator/Job.cs
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,7 @@ private static IServiceProvider CreateProvider(IServiceCollection services, ICon
(pi, ctx) => ctx.ResolveKeyed<TopicClientWrapper>(PackageVerificationTopicClientBindingKey)))
.WithParameter(new ResolvedParameter(
(pi, ctx) => pi.ParameterType == typeof(IBrokeredMessageSerializer<SignatureValidationMessage>),
(pi, ctx) => ctx.Resolve<SignatureValidationMessageSerializer>()
))
(pi, ctx) => ctx.Resolve<SignatureValidationMessageSerializer>()))
.As<IProcessSignatureEnqueuer>();

containerBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,74 @@ public async Task<bool> HandleAsync(PackageValidationMessageData message)
throw new ArgumentNullException(nameof(message));
}

switch (message.Type)
{
case PackageValidationMessageType.CheckValidator:
return await CheckValidatorAsync(message.CheckValidator);
case PackageValidationMessageType.ProcessValidationSet:
return await ProcessValidationSetAsync(message.ProcessValidationSet, message.DeliveryCount);
default:
throw new NotSupportedException($"The package validation message type '{message.Type}' is not supported.");
}
}

private async Task<bool> CheckValidatorAsync(CheckValidatorData message)
{
PackageValidationSet validationSet;
IValidatingEntity<Package> package;
using (_logger.BeginScope("Finding validation set and package for validation ID {ValidationId}", message.ValidationId))
{
validationSet = await _validationSetProvider.TryGetParentValidationSetAsync(message.ValidationId);
if (validationSet == null)
{
_logger.LogError("Could not find validation set for {ValidationId}.", message.ValidationId);
return false;
}

if (validationSet.ValidatingType != ValidatingType.Package)
{
_logger.LogError("Validation set {ValidationSetId} is not for a package.", message.ValidationId);
return false;
}

package = _galleryPackageService.FindPackageByKey(validationSet.PackageKey);
if (package == null)
{
_logger.LogError(
"Could not find package {PackageId} {PackageVersion} for validation set {ValidationSetId}.",
validationSet.PackageId,
validationSet.PackageNormalizedVersion,
validationSet.ValidationTrackingId);
return false;
}

// Immediately halt validation of a soft deleted package.
if (package.Status == PackageStatus.Deleted)
{
_logger.LogWarning(
"Package {PackageId} {PackageNormalizedVersion} (package key {PackageKey}) is soft deleted. Dropping message for validation set {ValidationSetId}.",
validationSet.PackageId,
validationSet.PackageNormalizedVersion,
package.Key,
validationSet.ValidationTrackingId);

return true;
}
}

using (_logger.BeginScope("Handling check validator message for {PackageId} {PackageVersion} validation set {ValidationSetId}",
validationSet.PackageId,
validationSet.PackageNormalizedVersion,
validationSet.ValidationTrackingId))
{
await ProcessValidationSetAsync(package, validationSet, scheduleNextCheck: false);
}

return true;
}

private async Task<bool> ProcessValidationSetAsync(ProcessValidationSetData message, int deliveryCount)
{
using (_logger.BeginScope("Handling message for {PackageId} {PackageVersion} validation set {ValidationSetId}",
message.PackageId,
message.PackageNormalizedVersion,
Expand All @@ -75,12 +143,12 @@ public async Task<bool> HandleAsync(PackageValidationMessageData message)
if (package == null)
{
// no package in DB yet. Might have received message a bit early, need to retry later
if (message.DeliveryCount - 1 >= _configs.MissingPackageRetryCount)
if (deliveryCount - 1 >= _configs.MissingPackageRetryCount)
{
_logger.LogWarning("Could not find package {PackageId} {PackageNormalizedVersion} in DB after {DeliveryCount} tries, dropping message",
message.PackageId,
message.PackageNormalizedVersion,
message.DeliveryCount);
deliveryCount);

_telemetryService.TrackMissingPackageForValidationMessage(
message.PackageId,
Expand Down Expand Up @@ -125,21 +193,35 @@ public async Task<bool> HandleAsync(PackageValidationMessageData message)
return true;
}

if (validationSet.ValidationSetStatus == ValidationSetStatus.Completed)
{
_logger.LogInformation(
"The validation set {PackageId} {PackageNormalizedVersion} {ValidationSetId} is already " +
"completed. Discarding the message.",
message.PackageId,
message.PackageNormalizedVersion,
message.ValidationTrackingId);
return true;
}

var processorStats = await _validationSetProcessor.ProcessValidationsAsync(validationSet);
await _validationOutcomeProcessor.ProcessValidationOutcomeAsync(validationSet, package, processorStats);
await ProcessValidationSetAsync(package, validationSet, scheduleNextCheck: true);
}

return true;
}

private async Task ProcessValidationSetAsync(
IValidatingEntity<Package> package,
PackageValidationSet validationSet,
bool scheduleNextCheck)
{
if (validationSet.ValidationSetStatus == ValidationSetStatus.Completed)
{
_logger.LogInformation(
"The validation set {PackageId} {PackageNormalizedVersion} {ValidationSetId} is already " +
"completed. Discarding the message.",
validationSet.PackageId,
validationSet.PackageNormalizedVersion,
validationSet.ValidationTrackingId);
return;
}

var processorStats = await _validationSetProcessor.ProcessValidationsAsync(validationSet);

await _validationOutcomeProcessor.ProcessValidationOutcomeAsync(
validationSet,
package,
processorStats,
scheduleNextCheck);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,63 @@ public async Task<bool> HandleAsync(PackageValidationMessageData message)
throw new ArgumentNullException(nameof(message));
}

switch (message.Type)
{
case PackageValidationMessageType.CheckValidator:
return await CheckValidatorAsync(message.CheckValidator);
case PackageValidationMessageType.ProcessValidationSet:
return await ProcessValidationSetAsync(message.ProcessValidationSet, message.DeliveryCount);
default:
throw new NotSupportedException($"The package validation message type '{message.Type}' is not supported.");
}
}

private async Task<bool> CheckValidatorAsync(CheckValidatorData message)
{
PackageValidationSet validationSet;
IValidatingEntity<SymbolPackage> symbolPackageEntity;
using (_logger.BeginScope("Finding validation set and symbol package for validation ID {ValidationId}", message.ValidationId))
{
validationSet = await _validationSetProvider.TryGetParentValidationSetAsync(message.ValidationId);
if (validationSet == null)
{
_logger.LogError("Could not find validation set for {ValidationId}.", message.ValidationId);
return false;
}

if (validationSet.ValidatingType != ValidatingType.SymbolPackage)
{
_logger.LogError("Validation set {ValidationSetId} is not for a symbol package.", message.ValidationId);
return false;
}

symbolPackageEntity = _gallerySymbolService.FindPackageByKey(validationSet.PackageKey);
if (symbolPackageEntity == null)
{
_logger.LogError(
"Could not find symbol package {PackageId} {PackageVersion} {Key} for validation set {ValidationSetId}.",
validationSet.PackageId,
validationSet.PackageNormalizedVersion,
validationSet.PackageKey,
validationSet.ValidationTrackingId);
return false;
}
}

using (_logger.BeginScope("Handling check symbol validator message for {PackageId} {PackageVersion} {Key} validation set {ValidationSetId}",
validationSet.PackageId,
validationSet.PackageNormalizedVersion,
validationSet.PackageKey,
validationSet.ValidationTrackingId))
{
await ProcessValidationSetAsync(symbolPackageEntity, validationSet, scheduleNextCheck: false);
}

return true;
}

private async Task<bool> ProcessValidationSetAsync(ProcessValidationSetData message, int deliveryCount)
{
using (_logger.BeginScope("Handling symbol message for {PackageId} {PackageVersion} validation set {ValidationSetId}",
message.PackageId,
message.PackageNormalizedVersion,
Expand All @@ -82,12 +139,12 @@ public async Task<bool> HandleAsync(PackageValidationMessageData message)
if (symbolPackageEntity == null)
{
// no package in DB yet. Might have received message a bit early, need to retry later
if (message.DeliveryCount - 1 >= _configs.MissingPackageRetryCount)
if (deliveryCount - 1 >= _configs.MissingPackageRetryCount)
{
_logger.LogWarning("Could not find symbols for package {PackageId} {PackageNormalizedVersion} in DB after {DeliveryCount} tries, dropping message",
message.PackageId,
message.PackageNormalizedVersion,
message.DeliveryCount);
deliveryCount);

_telemetryService.TrackMissingPackageForValidationMessage(
message.PackageId,
Expand Down Expand Up @@ -120,23 +177,35 @@ public async Task<bool> HandleAsync(PackageValidationMessageData message)
return true;
}

if (validationSet.ValidationSetStatus == ValidationSetStatus.Completed)
{
_logger.LogInformation(
"The validation set {PackageId} {PackageNormalizedVersion} {ValidationSetId} is already " +
"completed. Discarding the message.",
message.PackageId,
message.PackageNormalizedVersion,
message.ValidationTrackingId);
return true;
}

var processorStats = await _validationSetProcessor.ProcessValidationsAsync(validationSet);
// As part of the processing the validation outcome the orchestrator will send itself a message if validation are still being processed.
await _validationOutcomeProcessor.ProcessValidationOutcomeAsync(validationSet, symbolPackageEntity, processorStats);
await ProcessValidationSetAsync(symbolPackageEntity, validationSet, scheduleNextCheck: true);
}

return true;
}

private async Task ProcessValidationSetAsync(
IValidatingEntity<SymbolPackage> symbolPackageEntity,
PackageValidationSet validationSet,
bool scheduleNextCheck)
{
if (validationSet.ValidationSetStatus == ValidationSetStatus.Completed)
{
_logger.LogInformation(
"The validation set {PackageId} {PackageNormalizedVersion} {ValidationSetId} is already " +
"completed. Discarding the message.",
validationSet.PackageId,
validationSet.PackageNormalizedVersion,
validationSet.ValidationTrackingId);
return;
}

var processorStats = await _validationSetProcessor.ProcessValidationsAsync(validationSet);

await _validationOutcomeProcessor.ProcessValidationOutcomeAsync(
validationSet,
symbolPackageEntity,
processorStats,
scheduleNextCheck);
}
}
}
Loading

0 comments on commit ca5b9aa

Please sign in to comment.