Skip to content
This repository has been archived by the owner on Jul 30, 2024. It is now read-only.
/ NuGet.Jobs Public archive

Commit

Permalink
Create new scope for each initialization batch (#529)
Browse files Browse the repository at this point in the history
Split up the initialization phase into multiple scopes. Each phase uses its own scope, and each batch within each phase uses its own scope (as there's a 30 second sleep time between initialization batches).
  • Loading branch information
loic-sharma authored Aug 10, 2018
1 parent b01d02f commit df60067
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 51 deletions.
102 changes: 65 additions & 37 deletions src/NuGet.Services.Revalidate/Initialization/InitializationManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using NuGet.Services.Validation;
using NuGet.Versioning;
Expand All @@ -18,19 +19,22 @@ public class InitializationManager
private readonly IRevalidationJobStateService _jobState;
private readonly IPackageRevalidationStateService _packageState;
private readonly IPackageFinder _packageFinder;
private readonly IServiceScopeFactory _scopeFactory;
private readonly InitializationConfiguration _config;
private readonly ILogger<InitializationManager> _logger;

public InitializationManager(
IRevalidationJobStateService jobState,
IPackageRevalidationStateService packageState,
IPackageFinder packageFinder,
IServiceScopeFactory scopeFactory,
InitializationConfiguration config,
ILogger<InitializationManager> logger)
{
_jobState = jobState ?? throw new ArgumentNullException(nameof(jobState));
_packageState = packageState ?? throw new ArgumentNullException(nameof(packageState));
_packageFinder = packageFinder ?? throw new ArgumentNullException(nameof(packageFinder));
_scopeFactory = scopeFactory ?? throw new ArgumentNullException(nameof(scopeFactory));
_config = config ?? throw new ArgumentNullException(nameof(config));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
Expand Down Expand Up @@ -122,59 +126,83 @@ private async Task ClearPackageRevalidationStateAsync()

private async Task InitializePackageSetAsync(string setName, HashSet<int> packageRegistrationKeys)
{
var packageInformations = await _packageFinder.FindPackageRegistrationInformationAsync(setName, packageRegistrationKeys);
using (var scope = _scopeFactory.CreateScope())
{
var scopedPackageFinder = scope.ServiceProvider.GetRequiredService<IPackageFinder>();
var scopedJobState = scope.ServiceProvider.GetRequiredService<IRevalidationJobStateService>();
var scopedScopeFactory = scope.ServiceProvider.GetRequiredService<IServiceScopeFactory>();

var chunks = packageInformations
.OrderByDescending(p => p.Downloads)
.WeightedBatch(BatchSize, p => p.Versions);
var packageInformations = await scopedPackageFinder.FindPackageRegistrationInformationAsync(setName, packageRegistrationKeys);
var chunks = packageInformations
.OrderByDescending(p => p.Downloads)
.WeightedBatch(BatchSize, p => p.Versions);

for (var chunkIndex = 0; chunkIndex < chunks.Count; chunkIndex++)
{
while (await _jobState.IsKillswitchActiveAsync())
for (var chunkIndex = 0; chunkIndex < chunks.Count; chunkIndex++)
{
_logger.LogInformation(
"Delaying initialization of chunk {Chunk} of {Chunks} for package set {SetName} due to active killswitch",
chunkIndex + 1,
chunks.Count,
setName);
while (await scopedJobState.IsKillswitchActiveAsync())
{
_logger.LogInformation(
"Delaying initialization of chunk {Chunk} of {Chunks} for package set {SetName} due to active killswitch",
chunkIndex + 1,
chunks.Count,
setName);

await Task.Delay(_config.SleepDurationBetweenBatches);
await Task.Delay(_config.SleepDurationBetweenBatches);
}

await InitializePackageSetChunkAsync(setName, chunks, chunkIndex, scopedScopeFactory, _logger);

// Sleep if this is not the last chunk to prevent overloading the database.
if (chunkIndex < chunks.Count - 1)
{
_logger.LogInformation(
"Sleeping for {SleepDuration} before initializing the next chunk...",
_config.SleepDurationBetweenBatches);

await Task.Delay(_config.SleepDurationBetweenBatches);
}
}

_logger.LogInformation(
"Initializing chunk {Chunk} of {Chunks} for package set {SetName}...",
chunkIndex + 1,
chunks.Count,
setName);
_logger.LogInformation("Finished initializing package set {SetName}", setName);
}
}

private static async Task InitializePackageSetChunkAsync(
string setName,
List<List<PackageRegistrationInformation>> chunks,
int chunkIndex,
IServiceScopeFactory scopeFactory,
ILogger<InitializationManager> logger)
{
logger.LogInformation(
"Initializing chunk {Chunk} of {Chunks} for package set {SetName}...",
chunkIndex + 1,
chunks.Count,
setName);

using (var scope = scopeFactory.CreateScope())
{
var scopedPackageState = scope.ServiceProvider.GetRequiredService<IPackageRevalidationStateService>();
var scopedPackageFinder = scope.ServiceProvider.GetRequiredService<IPackageFinder>();

var chunk = chunks[chunkIndex];
var versions = _packageFinder.FindAppropriateVersions(chunk);
var versions = scopedPackageFinder.FindAppropriateVersions(chunk);

await InitializeRevalidationsAsync(chunk, versions);
await InitializeRevalidationsAsync(chunk, versions, scopedPackageState, logger);

_logger.LogInformation(
logger.LogInformation(
"Initialized chunk {Chunk} of {Chunks} for package set {SetName}",
chunkIndex + 1,
chunks.Count,
setName);

// Sleep if this is not the last chunk to prevent overloading the database.
if (chunkIndex < chunks.Count - 1)
{
_logger.LogInformation(
"Sleeping for {SleepDuration} before initializing the next chunk...",
_config.SleepDurationBetweenBatches);

await Task.Delay(_config.SleepDurationBetweenBatches);
}
}

_logger.LogInformation("Finished initializing package set {SetName}", setName);
}

private async Task InitializeRevalidationsAsync(
private static async Task InitializeRevalidationsAsync(
List<PackageRegistrationInformation> packageRegistrations,
Dictionary<int, List<NuGetVersion>> versions)
Dictionary<int, List<NuGetVersion>> versions,
IPackageRevalidationStateService packageState,
ILogger<InitializationManager> logger)
{
var revalidations = new List<PackageRevalidation>();

Expand All @@ -184,7 +212,7 @@ private async Task InitializeRevalidationsAsync(

if (!versions.ContainsKey(packageRegistration.Key) || versions[packageRegistration.Key].Count == 0)
{
_logger.LogWarning("Could not find any versions of package {PackageId} to revalidate", packageId);
logger.LogWarning("Could not find any versions of package {PackageId} to revalidate", packageId);

continue;
}
Expand All @@ -205,7 +233,7 @@ private async Task InitializeRevalidationsAsync(
}
}

await _packageState.AddPackageRevalidationsAsync(revalidations);
await packageState.AddPackageRevalidationsAsync(revalidations);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ public PackageRevalidationStateService(IValidationEntitiesContext context, ILogg

public async Task AddPackageRevalidationsAsync(IReadOnlyList<PackageRevalidation> revalidations)
{
_logger.LogDebug("Persisting package revalidations to database...");

var validationContext = _context as ValidationEntitiesContext;

if (validationContext != null)
Expand All @@ -38,13 +40,19 @@ public async Task AddPackageRevalidationsAsync(IReadOnlyList<PackageRevalidation
_context.PackageRevalidations.Add(revalidation);
}

_logger.LogDebug("Saving the validation context...");

await _context.SaveChangesAsync();

_logger.LogDebug("Finished saving the validation context...");

if (validationContext != null)
{
validationContext.Configuration.AutoDetectChangesEnabled = true;
validationContext.Configuration.ValidateOnSaveEnabled = true;
}

_logger.LogDebug("Finished persisting package revalidations to database...");
}

public async Task<int> RemovePackageRevalidationsAsync(int max)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Moq;
using NuGet.Services.Validation;
Expand All @@ -22,7 +23,7 @@ public class TheInitializeAsyncMethod : FactsBase
public async Task ThrowsIfAlreadyInitialized()
{
// Arrange
_settings.Setup(s => s.IsInitializedAsync()).ReturnsAsync(true);
_jobState.Setup(s => s.IsInitializedAsync()).ReturnsAsync(true);

// Act & Assert
var e = await Assert.ThrowsAsync<InvalidOperationException>(() => _target.InitializeAsync());
Expand Down Expand Up @@ -223,39 +224,43 @@ public async Task PartitionsPackagesIntoBatchesOf1000OrLessVersions(int[] packag
_packageState.Verify(
s => s.AddPackageRevalidationsAsync(It.IsAny<IReadOnlyList<PackageRevalidation>>()),
Times.Exactly(expectedBatches));

// A scope should be created for each package set. Also, a scope should be created
// for each batch.
_scopeFactory.Verify(f => f.CreateScope(), Times.Exactly(4 + expectedBatches));
}

public static IEnumerable<object[]> PartitionsPackagesIntoBatchesOf1000OrLessVersionsData()
{
yield return new object[]
{
new[] { 1001 },
1
1,
};

yield return new object[]
{
new[] { 1, 1001, 1 },
3
3,
};

// Should be batched into two batches of 501 items.
yield return new object[]
{
new[] { 1, 500, 500, 1 },
2
2,
};

yield return new object[]
{
new[] { 500, 500 },
1
1,
};

yield return new object[]
{
Enumerable.Repeat(1, 1000).ToArray(),
1
1,
};
}

Expand All @@ -277,15 +282,15 @@ public async Task MarksAsInitializedAfterAddingRevalidations()
.Callback(() => addRevalidationOrder = order++)
.Returns(Task.CompletedTask);

_settings
_jobState
.Setup(s => s.MarkAsInitializedAsync())
.Callback(() => markAsInitializedOrder = order++)
.Returns(Task.CompletedTask);

// Act & Assert
await _target.InitializeAsync();

_settings.Verify(s => s.MarkAsInitializedAsync(), Times.Once);
_jobState.Verify(s => s.MarkAsInitializedAsync(), Times.Once);

Assert.True(markAsInitializedOrder > addRevalidationOrder);
}
Expand Down Expand Up @@ -396,7 +401,7 @@ public class TheVerifyAsyncMethod : FactsBase
[Fact]
public async Task ThrowsIfNotInitialized()
{
_settings.Setup(s => s.IsInitializedAsync()).ReturnsAsync(false);
_jobState.Setup(s => s.IsInitializedAsync()).ReturnsAsync(false);

var e = await Assert.ThrowsAsync<Exception>(() => _target.VerifyInitializationAsync());

Expand All @@ -406,7 +411,7 @@ public async Task ThrowsIfNotInitialized()
[Fact]
public async Task ThrowsIfAppropriatePackageCountDoesNotMatchRevalidationCount()
{
_settings.Setup(s => s.IsInitializedAsync()).ReturnsAsync(true);
_jobState.Setup(s => s.IsInitializedAsync()).ReturnsAsync(true);
_packageFinder.Setup(f => f.AppropriatePackageCount()).Returns(100);
_packageState.Setup(s => s.PackageRevalidationCountAsync()).ReturnsAsync(50);

Expand All @@ -418,7 +423,7 @@ public async Task ThrowsIfAppropriatePackageCountDoesNotMatchRevalidationCount()
[Fact]
public async Task DoesNotThrowIfCountsMatch()
{
_settings.Setup(s => s.IsInitializedAsync()).ReturnsAsync(true);
_jobState.Setup(s => s.IsInitializedAsync()).ReturnsAsync(true);
_packageFinder.Setup(f => f.AppropriatePackageCount()).Returns(100);
_packageState.Setup(s => s.PackageRevalidationCountAsync()).ReturnsAsync(100);

Expand All @@ -428,25 +433,39 @@ public async Task DoesNotThrowIfCountsMatch()

public class FactsBase
{
public readonly Mock<IRevalidationJobStateService> _settings;
public readonly Mock<IRevalidationJobStateService> _jobState;
public readonly Mock<IPackageRevalidationStateService> _packageState;
public readonly Mock<IPackageFinder> _packageFinder;
public readonly Mock<IServiceScopeFactory> _scopeFactory;

public readonly InitializationConfiguration _config;
public readonly InitializationManager _target;

public FactsBase()
{
_settings = new Mock<IRevalidationJobStateService>();
_jobState = new Mock<IRevalidationJobStateService>();
_packageState = new Mock<IPackageRevalidationStateService>();
_packageFinder = new Mock<IPackageFinder>();
_scopeFactory = new Mock<IServiceScopeFactory>();

var scope = new Mock<IServiceScope>();
var serviceProvider = new Mock<IServiceProvider>();

serviceProvider.Setup(p => p.GetService(typeof(IRevalidationJobStateService))).Returns(_jobState.Object);
serviceProvider.Setup(p => p.GetService(typeof(IPackageRevalidationStateService))).Returns(_packageState.Object);
serviceProvider.Setup(p => p.GetService(typeof(IPackageFinder))).Returns(_packageFinder.Object);
serviceProvider.Setup(p => p.GetService(typeof(IServiceScopeFactory))).Returns(_scopeFactory.Object);

scope.Setup(s => s.ServiceProvider).Returns(serviceProvider.Object);
_scopeFactory.Setup(s => s.CreateScope()).Returns(scope.Object);

_config = new InitializationConfiguration();

_target = new InitializationManager(
_settings.Object,
_jobState.Object,
_packageState.Object,
_packageFinder.Object,
_scopeFactory.Object,
_config,
Mock.Of<ILogger<InitializationManager>>());
}
Expand Down

0 comments on commit df60067

Please sign in to comment.