Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean idempotent consumers periodically #147

Merged
merged 2 commits into from
May 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions src/Application/IdempotentConsumers/CleanIdempotentConsumers.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using SiteWatcher.Application.Interfaces;
using SiteWatcher.Domain.Authentication;

namespace SiteWatcher.Application.IdempotentConsumers;

public sealed class CleanIdempotentConsumers
{
private readonly ISiteWatcherContext _context;
private readonly ILogger<CleanIdempotentConsumers> _logger;
private readonly ISession _session;

public CleanIdempotentConsumers(ISiteWatcherContext context, ILogger<CleanIdempotentConsumers> logger, ISession session)
{
_context = context;
_logger = logger;
_session = session;
}

public async Task Clean(CancellationToken cancellationToken)
{
var fiveDaysEarlier = _session.Now.Date.AddDays(-5);

var rowsDeleted = await _context.IdempotentConsumers
.Where(i => i.DateCreated < fiveDaysEarlier)
.ExecuteDeleteAsync(cancellationToken);

_logger.LogInformation("{Date} - IdempotentConsumers cleaned: {Rows} rows deleted",
_session.Now, rowsDeleted);
}
}
32 changes: 32 additions & 0 deletions src/Worker/Jobs/CleanIdempotentConsumersPeriodically.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using SiteWatcher.Application.IdempotentConsumers;
using SiteWatcher.Application.Interfaces;

namespace SiteWatcher.Worker.Jobs;

public sealed class CleanIdempotentConsumersPeriodically : BackgroundService
{
private readonly IServiceScopeFactory _scopeFactory;
private readonly PeriodicTimer _timer;

public CleanIdempotentConsumersPeriodically(IServiceScopeFactory scopeProvider, IAppSettings settings)
{
_scopeFactory = scopeProvider;
_timer = new PeriodicTimer(settings.IsDevelopment ? TimeSpan.FromSeconds(15) : TimeSpan.FromDays(1));
}

protected override Task ExecuteAsync(CancellationToken stoppingToken) =>
Task.Run(async () =>
{
while (!stoppingToken.IsCancellationRequested)
{
await using var scope = _scopeFactory.CreateAsyncScope();
var handler = scope.ServiceProvider.GetRequiredService<CleanIdempotentConsumers>();

await handler.Clean(stoppingToken);

await _timer.WaitForNextTickAsync(stoppingToken);
}
}, stoppingToken);
}
8 changes: 6 additions & 2 deletions src/Worker/Jobs/JobConfigurator.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using Microsoft.Extensions.DependencyInjection;
using SiteWatcher.Application;
using SiteWatcher.Application.Alerts.Commands.ExecuteAlerts;
using SiteWatcher.Application.IdempotentConsumers;
using SiteWatcher.Common.Services;
using SiteWatcher.Domain.DomainServices;
using SiteWatcher.Infra;
Expand All @@ -26,8 +27,11 @@ public static IServiceCollection SetupJobs(this IServiceCollection serviceCollec
if (!settings.EnableJobs)
return serviceCollection;

serviceCollection.AddHostedService<ExecuteAlertsPeriodically>();
serviceCollection.AddScoped<ExecuteAlertsCommandHandler>();
serviceCollection
.AddHostedService<ExecuteAlertsPeriodically>()
.AddScoped<ExecuteAlertsCommandHandler>()
.AddHostedService<CleanIdempotentConsumersPeriodically>()
.AddScoped<CleanIdempotentConsumers>();

return serviceCollection;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
using FluentAssertions;
using IntegrationTests.Setup;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using SiteWatcher.Application.IdempotentConsumers;
using SiteWatcher.Domain.Authentication;
using SiteWatcher.Domain.Common.ValueObjects;
using SiteWatcher.IntegrationTests.Setup.TestServices;
using SiteWatcher.IntegrationTests.Setup.WebApplicationFactory;

namespace IntegrationTests.IdempotentConsumers;

public sealed class CleanIdempotentConsumersTestBase : BaseTestFixture
{
public override async Task InitializeAsync()
{
await base.InitializeAsync();

await AppFactory.WithDbContext(ctx =>
{
AppFactory.CurrentTime = DateTime.UtcNow;
var fiveDaysAhead = DateTime.UtcNow.AddDays(5);

var idempotentConsumers = Enumerable.Range(0, 10)
.Select(x => new IdempotentConsumer
{
Consumer = "TestConsumer",
DateCreated = x < 8 ? AppFactory.CurrentTime : fiveDaysAhead,
MessageId = $"{x}"
});
ctx.IdempotentConsumers.AddRange(idempotentConsumers);
return ctx.SaveChangesAsync();
});
}
}

public sealed class CleanIdempotentConsumersTests : BaseTest, IClassFixture<CleanIdempotentConsumersTestBase>
{
public CleanIdempotentConsumersTests(CleanIdempotentConsumersTestBase fixture) : base(fixture)
{
}

[Fact]
public async Task IdempotentConsumersAreCleanedAfterFiveDays()
{
// Arrange
await using var scope = AppFactory.Services.CreateAsyncScope();
var session = scope.ServiceProvider.GetRequiredService<ISession>() as TestSession;
var handler = scope.ServiceProvider.GetRequiredService<CleanIdempotentConsumers>();
var initialCount = await GetIdempotentConsumersCount();

// Act & Assert

// First time nothing will be cleaned
await handler.Clean(CancellationToken.None);
var currentCount = await GetIdempotentConsumersCount();
currentCount.Should().Be(initialCount);

// After five days, 8 should be deleted
session!.SetNewDate(session.Now.Date.AddDays(6));
await handler.Clean(CancellationToken.None);
currentCount = await GetIdempotentConsumersCount();
currentCount.Should().Be(initialCount - 8);

// After another five days, the 3 left should be deleted
session!.SetNewDate(session.Now.Date.AddDays(5));
await handler.Clean(CancellationToken.None);
currentCount = await GetIdempotentConsumersCount();
currentCount.Should().Be(0);
}

private Task<int> GetIdempotentConsumersCount()
{
return AppFactory.WithDbContext(ctx => ctx.IdempotentConsumers.CountAsync());
}
}
4 changes: 3 additions & 1 deletion test/IntegrationTests/Setup/TestServices/TestSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ public TestSession(IHttpContextAccessor httpContextAccessor, DateTime currentTim
_currentTime = currentTime;
}

private readonly DateTime _currentTime;
private DateTime _currentTime;
public override DateTime Now => _currentTime;

public void SetNewDate(DateTime newDate) => _currentTime = newDate;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
using SiteWatcher.Application.Alerts.Commands.ExecuteAlerts;
using SiteWatcher.Application.Common.Queries;
using SiteWatcher.Application.Emails.Messages;
using SiteWatcher.Application.IdempotentConsumers;
using SiteWatcher.Application.Interfaces;
using SiteWatcher.Common.Services;
using SiteWatcher.Domain.Authentication.Services;
Expand Down Expand Up @@ -186,6 +187,9 @@ private void ReplaceServices(IServiceCollection services)
if (!_enableMasstransitTestHarness) return;
c.AddConsumers(typeof(SendEmailOnEmailCreatedMessageHandler).Assembly);
});

// IdempotentConsumers
services.AddScoped<CleanIdempotentConsumers>();
}

private void ConfigureOptionsReplacementServices(IServiceCollection services)
Expand Down
Loading