Skip to content

Commit

Permalink
feat(tasks): add maintenance and cleaner tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
pogromistik committed Oct 9, 2023
1 parent 3c96272 commit b4e69eb
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 1 deletion.
87 changes: 87 additions & 0 deletions src/Sitko.Core.Tasks/Tasks/CleanerTask.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
using System.Globalization;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Sitko.Core.Repository.EntityFrameworkCore;
using Sitko.Core.Tasks.Data.Entities;
using Sitko.Core.Tasks.Data.Repository;

namespace Sitko.Core.Tasks.Tasks;

public class CleanerTask<TBaseTask> : BackgroundService
where TBaseTask : BaseTask
{
private readonly IServiceScopeFactory serviceScopeFactory;
private readonly IOptions<TasksModuleOptions> options;
private readonly ILogger<CleanerTask<TBaseTask>> logger;
private ITaskRepository<TBaseTask> tasksRepository;

public CleanerTask(IOptions<TasksModuleOptions> options, ILogger<CleanerTask<TBaseTask>> logger,
IServiceScopeFactory serviceScopeFactory)
{
this.options = options;
this.logger = logger;
this.serviceScopeFactory = serviceScopeFactory;
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
await using var scope = serviceScopeFactory.CreateAsyncScope();
tasksRepository = scope.ServiceProvider.GetRequiredService<ITaskRepository<TBaseTask>>();
if (options.Value.AllTasksRetentionDays is > 0)
{
var taskTypes = options.Value.RetentionDays.Select(r => r.Key).ToArray();
await RemoveTasks(options.Value.AllTasksRetentionDays.Value, false, taskTypes);
}

foreach (var (taskType, retentionDays) in options.Value.RetentionDays)
{
if (retentionDays > 0)
{
await RemoveTasks(retentionDays, true, new[] { taskType });
}
}

await Task.Delay(TimeSpan.FromDays(1), stoppingToken);
}
}

private async Task RemoveTasks(int retentionDays, bool include, string[] types)
{
var date = DateTimeOffset.UtcNow.AddDays(retentionDays * -1);
logger.LogInformation("Deleting tasks from {Date}. Types: {Types}, include: {Include}", date, types, include);
if (tasksRepository is IEFRepository efRepository)
{
var condition = $"\"{nameof(BaseTask.DateAdded)}\" < '{date.ToString("O", CultureInfo.InvariantCulture)}'";
if (types.Length > 0)
{
condition +=
$" AND \"{nameof(BaseTask.Type)}\" {(include ? "IN" : "NOT IN")} ({string.Join(",", types.Select(type => $"'{type}'"))})";
}

var cnt = await efRepository.DeleteAllRawAsync(condition);
logger.LogInformation("Deleted {Count} tasks", cnt);
}
else
{
var (tasks, tasksCount) =
await tasksRepository.GetAllAsync(query => query.Where(task => task.DateAdded < date &&
(types.Length == 0 ||
(include
? types.Contains(task.Type)
: !types.Contains(task.Type)))));
if (tasksCount > 0)
{
foreach (var task in tasks)
{
await tasksRepository.DeleteAsync(task);
}

logger.LogInformation("Deleted {Count} tasks", tasksCount);
}
}
}
}
70 changes: 70 additions & 0 deletions src/Sitko.Core.Tasks/Tasks/MaintenanceTask.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Options;
using Sitko.Core.Tasks.Data;
using Sitko.Core.Tasks.Data.Entities;
using Sitko.Core.Tasks.Data.Repository;
using Sitko.Core.Tasks.Execution;
using TaskStatus = Sitko.Core.Tasks.Data.Entities.TaskStatus;

namespace Sitko.Core.Tasks.Tasks;

public class MaintenanceTask<TBaseTask, TDbContext> : BackgroundService
where TBaseTask : BaseTask where TDbContext : TasksDbContext<TBaseTask>
{
private readonly IOptions<TasksModuleOptions> options;
private readonly IServiceScopeFactory serviceScopeFactory;

public MaintenanceTask(IOptions<TasksModuleOptions> options,
IServiceScopeFactory serviceScopeFactory)
{
this.options = options;
this.serviceScopeFactory = serviceScopeFactory;
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
await using var scope = serviceScopeFactory.CreateAsyncScope();
var tasksRepository = scope.ServiceProvider.GetRequiredService<ITaskRepository<TBaseTask>>();
var inactivityDate = DateTimeOffset.UtcNow - options.Value.TasksInactivityTimeout;
var waitDate = DateTimeOffset.UtcNow - options.Value.TasksWaitTimeout;
var stuckTasks = await tasksRepository.GetAllAsync(query =>
query.Where(task =>
(task.TaskStatus == TaskStatus.InProgress && task.LastActivityDate < inactivityDate) ||
(task.TaskStatus == TaskStatus.Wait && task.DateAdded < waitDate)), stoppingToken);
if (stuckTasks.items.Length > 0)
{
switch (options.Value.StuckTasksProcessMode)
{
case StuckTasksProcessMode.Fail:
await tasksRepository.BeginBatchAsync(stoppingToken);
foreach (var task in stuckTasks.items)
{
var error = task.TaskStatus == TaskStatus.InProgress
? $"Task inactive since {task.LastActivityDate}"
: $"Task in queue since {task.DateAdded}";
TasksExtensions.SetTaskErrorResult((IBaseTaskWithConfigAndResult)task, error);
await tasksRepository.UpdateAsync(task, stoppingToken);
}

await tasksRepository.CommitBatchAsync(stoppingToken);
break;
case StuckTasksProcessMode.Restart:
var taskExecutor = scope.ServiceProvider.GetRequiredService<ITaskExecutor>();
foreach (var stuckTask in stuckTasks.items)
{
await taskExecutor.ExecuteAsync(stuckTask.Id, stoppingToken);
}

break;
default:
throw new ArgumentOutOfRangeException();
}
}

await Task.Delay(TimeSpan.FromMinutes(5), stoppingToken);
}
}
}
30 changes: 30 additions & 0 deletions src/Sitko.Core.Tasks/TasksExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
using System.Reflection;
using Sitko.Core.Tasks.Data.Entities;
using TaskStatus = Sitko.Core.Tasks.Data.Entities.TaskStatus;

namespace Sitko.Core.Tasks;

internal static class TasksExtensions
{
private static readonly MethodInfo? SetErrorResultMethodInfo =
typeof(TasksExtensions).GetMethod(nameof(SetErrorResult), BindingFlags.Static | BindingFlags.Public);

public static void SetErrorResult<TTask, TConfig, TResult>(TTask task, string error)
where TTask : IBaseTask<TConfig, TResult>
where TConfig : BaseTaskConfig, new()
where TResult : BaseTaskResult, new()
{
var result = new TResult { ErrorMessage = error, IsSuccess = false };
task.TaskStatus = TaskStatus.Fails;
task.Result = result;
task.ExecuteDateEnd = DateTimeOffset.UtcNow;
}

public static void SetTaskErrorResult(IBaseTaskWithConfigAndResult task, string error)
{
var scheduleMethod = SetErrorResultMethodInfo!.MakeGenericMethod(task.GetType(),
task.ConfigType, task.ResultType);
scheduleMethod.Invoke(null, new object?[] { task, error });
}
}

3 changes: 3 additions & 0 deletions src/Sitko.Core.Tasks/TasksModule.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using Sitko.Core.Tasks.Data.Repository;
using Sitko.Core.Tasks.Execution;
using Sitko.Core.Tasks.Scheduling;
using Sitko.Core.Tasks.Tasks;

namespace Sitko.Core.Tasks;

Expand Down Expand Up @@ -61,6 +62,8 @@ public override void ConfigureServices(IApplicationContext applicationContext, I
services.Configure<TasksModuleOptions>(applicationContext.Configuration.GetSection(OptionsKey));
services.AddTransient<IRepository<TBaseTask, Guid>, TasksRepository<TBaseTask, TDbContext>>();
services.AddTransient<ITaskRepository<TBaseTask>, TasksRepository<TBaseTask, TDbContext>>();
services.AddHostedService<CleanerTask<TBaseTask>>();
services.AddHostedService<MaintenanceTask<TBaseTask, TDbContext>>();

ConfigureServicesInternal(applicationContext, services, startupOptions, executors);
startupOptions.ConfigureServices(services);
Expand Down
11 changes: 10 additions & 1 deletion src/Sitko.Core.Tasks/TasksModuleOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ public class TasksModuleOptions
{
public bool IsAllTasksDisabled { get; set; }
public string[] DisabledTasks { get; set; } = Array.Empty<string>();

public int? AllTasksRetentionDays { get; set; }
public Dictionary<string, int> RetentionDays { get; set; } = new();
public TimeSpan TasksInactivityTimeout { get; set; } = TimeSpan.FromMinutes(30);
public TimeSpan TasksWaitTimeout { get; set; } = TimeSpan.FromMinutes(60);
public StuckTasksProcessMode StuckTasksProcessMode { get; set; } = StuckTasksProcessMode.Fail;
}


Expand All @@ -24,6 +26,7 @@ public abstract class TasksModuleOptions<TBaseTask, TDbContext> : BaseModuleOpti
{
public List<Assembly> Assemblies { get; } = new();
private readonly List<Action<IServiceCollection>> jobServiceConfigurations = new();

public string TableName { get; set; } = "Tasks";

private readonly List<Action<TasksDbContextOptionsExtension<TBaseTask>>>
Expand Down Expand Up @@ -83,3 +86,9 @@ public TasksModuleOptions<TBaseTask, TDbContext> AddTask<TTask, TConfig, TResult

public abstract Type GetValidatorType();
}

public enum StuckTasksProcessMode
{
Fail = 1,
Restart = 2
}

0 comments on commit b4e69eb

Please sign in to comment.