Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: notifica al sincronizador en tiempo real cuando hay requests pendientes por procesar #28

Merged
merged 1 commit into from
Aug 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/Api.Core.Domain/Common/GetPendingRequestsMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
namespace Api.Core.Domain.Common;

public sealed record GetPendingRequestsMessage(string SubscriptionKey, string EmpresaRfc);
10 changes: 9 additions & 1 deletion src/Api.Presentation.WebApi/Controllers/RequestsController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@
using Api.Core.Application.Requests.Queries.GetApiRequestById;
using Api.Core.Application.Requests.Queries.GetApiRequests;
using Api.Core.Application.Requests.Queries.GetPendingApiRequests;
using Api.Core.Domain.Common;
using Api.Core.Domain.Requests;
using Api.Presentation.WebApi.Authentication;
using Api.Presentation.WebApi.Filters;
using Api.Presentation.WebApi.Hubs;
using ARSoftware.Contpaqi.Api.Common.Domain;
using MediatR;
using Microsoft.AspNetCore.Mvc;
using Microsoft.AspNetCore.SignalR;

namespace Api.Presentation.WebApi.Controllers;

Expand All @@ -18,13 +21,15 @@ namespace Api.Presentation.WebApi.Controllers;
[ApiExceptionFilter]
public class RequestsController : ControllerBase
{
private readonly IHubContext<ApiRequestHub, IApiRequestHubClient> _hubContext;
private readonly LinkGenerator _linkGenerator;
private readonly IMediator _mediator;

public RequestsController(IMediator mediator, LinkGenerator linkGenerator)
public RequestsController(IMediator mediator, LinkGenerator linkGenerator, IHubContext<ApiRequestHub, IApiRequestHubClient> hubContext)
{
_mediator = mediator;
_linkGenerator = linkGenerator;
_hubContext = hubContext;
}

[FromHeader(Name = "Ocp-Apim-Subscription-Key")]
Expand Down Expand Up @@ -119,6 +124,9 @@ public async Task<ActionResult<Guid>> Post(ContpaqiRequest apiRequest)

Guid requestId = await _mediator.Send(new CreateApiRequestCommand(apiRequest, ApimSubscriptionKey, EmpresaRfc));

await _hubContext.Clients.Group(ApimSubscriptionKey)
.GetPendingRequests(new GetPendingRequestsMessage(ApimSubscriptionKey, EmpresaRfc));

ApiRequest? request = await _mediator.Send(new GetApiRequestByIdQuery(requestId, ApimSubscriptionKey));

if (EsperarRespuesta == true)
Expand Down
15 changes: 15 additions & 0 deletions src/Api.Presentation.WebApi/Hubs/ApiRequestHub.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using Microsoft.AspNetCore.SignalR;

namespace Api.Presentation.WebApi.Hubs;

public class ApiRequestHub : Hub<IApiRequestHubClient>
{
public override async Task OnConnectedAsync()
{
string groupName = Context.GetHttpContext()!.Request.Headers["Ocp-Apim-Subscription-Key"]!;

await Groups.AddToGroupAsync(Context.ConnectionId, groupName);

await base.OnConnectedAsync();
}
}
8 changes: 8 additions & 0 deletions src/Api.Presentation.WebApi/Hubs/IApiRequestHubClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
using Api.Core.Domain.Common;

namespace Api.Presentation.WebApi.Hubs;

public interface IApiRequestHubClient
{
Task GetPendingRequests(GetPendingRequestsMessage getPendingRequestsMessage);
}
5 changes: 5 additions & 0 deletions src/Api.Presentation.WebApi/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using Api.Infrastructure;
using Api.Infrastructure.Persistence;
using Api.Presentation.WebApi.Authentication;
using Api.Presentation.WebApi.Hubs;
using ARSoftware.Contpaqi.Api.Common.Domain;
using Microsoft.OpenApi.Models;

Expand All @@ -25,6 +26,8 @@
});
builder.Services.AddApplicationServices().AddInfrastructureServices(builder.Configuration);

builder.Services.AddSignalR();

builder.Services.AddScoped<ApiKeyAuthFilter>();

// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
Expand Down Expand Up @@ -82,6 +85,8 @@

app.UseAuthorization();

app.MapHub<ApiRequestHub>("/hubs/apiRequestHub");

app.MapControllers();

app.Run();
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ public sealed class ApiSyncConfig
private readonly TimeOnly _timeStarted = TimeOnly.FromDateTime(DateTime.Now);
public string SubscriptionKey { get; set; } = "00000000000000000000000000000000";
public string BaseAddress { get; set; } = string.Empty;
public TimeOnly WaitTime { get; set; } = TimeOnly.MinValue;
public TimeOnly ShutdownTime { get; set; } = new(20, 0, 0);
public List<string> Empresas { get; set; } = new();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.SignalR.Client" Version="7.0.9" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="7.0.1" />
<PackageReference Include="Serilog.Extensions.Hosting" Version="7.0.0" />
<PackageReference Include="Serilog.Settings.Configuration" Version="7.0.0" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using Api.Sync.Core.Application.Common.Models;
using Microsoft.AspNetCore.Http.Connections;
using Microsoft.AspNetCore.SignalR.Client;

namespace Api.Sync.Presentation.WorkerService;

public sealed class ApiRequestHubClientFactory
{
public static HubConnection BuildConnection(ApiSyncConfig apiSyncConfig)
{
HubConnection connection = new HubConnectionBuilder().WithUrl(new Uri($"{apiSyncConfig.BaseAddress}hubs/apiRequestHub"), options =>
{
options.Transports = HttpTransportType.WebSockets;
options.SkipNegotiation = true;
options.Headers.Add("Ocp-Apim-Subscription-Key", apiSyncConfig.SubscriptionKey);
})
.WithAutomaticReconnect()
.Build();

return connection;
}
}
50 changes: 37 additions & 13 deletions src/Api.Sync.Presentation.WorkerService/Worker.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System.Collections.Concurrent;
using System.Collections.Immutable;
using Api.Core.Domain.Common;
using Api.Sync.Core.Application.Api.Commands.ProcessApiRequest;
using Api.Sync.Core.Application.Api.Queries.GetPendingApiRequests;
using Api.Sync.Core.Application.Common.Models;
Expand All @@ -7,18 +9,21 @@
using ARSoftware.Contpaqi.Api.Common.Domain;
using ARSoftware.Contpaqi.Comercial.Sdk.Abstractions.Models;
using MediatR;
using Microsoft.AspNetCore.SignalR.Client;
using Microsoft.Extensions.Options;

namespace Api.Sync.Presentation.WorkerService;

public sealed class Worker : BackgroundService
{
private readonly ApiSyncConfig _apiSyncConfig;
private readonly HubConnection _connection;
private readonly ContpaqiComercialConfig _contpaqiComercialConfig;
private readonly IEmpresaRepository _empresaRepository;
private readonly IHostApplicationLifetime _hostApplicationLifetime;
private readonly ILogger<Worker> _logger;
private readonly IMediator _mediator;
private readonly ConcurrentQueue<GetPendingRequestsMessage> _pendingRequestQueue = new();

public Worker(ILogger<Worker> logger, IMediator mediator, IHostApplicationLifetime hostApplicationLifetime,
IOptions<ApiSyncConfig> apiSyncConfigOptions, IOptions<ContpaqiComercialConfig> contpaqiComercialConfigOptions,
Expand All @@ -30,6 +35,17 @@ public Worker(ILogger<Worker> logger, IMediator mediator, IHostApplicationLifeti
_empresaRepository = empresaRepository;
_apiSyncConfig = apiSyncConfigOptions.Value;
_contpaqiComercialConfig = contpaqiComercialConfigOptions.Value;

_connection = ApiRequestHubClientFactory.BuildConnection(_apiSyncConfig);
_connection.On<GetPendingRequestsMessage>("GetPendingRequests", getPendingRequestMessage =>
{
_logger.LogInformation("Get pending requests notification received: {GetPendingRequestsMessage}", getPendingRequestMessage);
if (_apiSyncConfig.Empresas.Contains(getPendingRequestMessage.EmpresaRfc))
_pendingRequestQueue.Enqueue(getPendingRequestMessage);
});

foreach (string empresa in _apiSyncConfig.Empresas)
_pendingRequestQueue.Enqueue(new GetPendingRequestsMessage(_apiSyncConfig.SubscriptionKey, empresa));
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
Expand All @@ -41,16 +57,25 @@ private async Task DoWork(CancellationToken stoppingToken)
{
try
{
await _connection.StartAsync(stoppingToken);

ImmutableList<Empresa> empresas = (await _empresaRepository.BuscarTodoAsync(LoadRelatedDataOptions.Default, stoppingToken))
.ToImmutableList();

while (!stoppingToken.IsCancellationRequested)
{
Task waitingTask = Task.Delay(_apiSyncConfig.WaitTime.ToTimeSpan(), stoppingToken);
if (_pendingRequestQueue.IsEmpty)
{
_logger.LogDebug("Esperando la siguiente iteraci�n.");
await Task.Delay(1000, stoppingToken);
continue;
}

_logger.LogInformation("Procesando solicitudes pendientes.");

foreach (string empresaRfc in _apiSyncConfig.Empresas)
if (_pendingRequestQueue.TryDequeue(out GetPendingRequestsMessage? getPendingRequestMessage))
{
Empresa? empresa = empresas.FirstOrDefault(e => e.Parametros!.Rfc == empresaRfc);
Empresa? empresa = empresas.FirstOrDefault(e => e.Parametros!.Rfc == getPendingRequestMessage.EmpresaRfc);

if (empresa is null) continue;

Expand All @@ -67,24 +92,18 @@ private async Task DoWork(CancellationToken stoppingToken)
{
int requestIndex = apiRequests.IndexOf(apiRequest) + 1;
int requestsCount = apiRequests.Count;
_logger.LogInformation("Empresa: {EmpresaRfc}. Procesando [{requestIndex} of {requestsCount}]", empresaRfc,
requestIndex, requestsCount);
_logger.LogInformation("Empresa: {EmpresaRfc}. Procesando [{requestIndex} of {requestsCount}]",
empresa.Parametros!.Rfc, requestIndex, requestsCount);

await _mediator.Send(new ProcessApiRequestCommand(apiRequest), stoppingToken);
}
}

if (_apiSyncConfig.ShouldShutDown())
{
_logger.LogInformation("La aplicacion debe apgarse.");
_logger.LogInformation("La aplicaci�n debe apagarse.");
break;
}

if (_apiSyncConfig.WaitTime != TimeOnly.MinValue)
{
_logger.LogDebug("Esperando la siguiente iteracion.");
await waitingTask;
}
}
}
catch (OperationCanceledException e)
Expand All @@ -93,9 +112,14 @@ private async Task DoWork(CancellationToken stoppingToken)
}
catch (Exception e)
{
_logger.LogCritical(e, "Critical error ocurred.");
_logger.LogCritical(e, "Critical error occurred.");
}

_hostApplicationLifetime.StopApplication();
}

public override async Task StopAsync(CancellationToken cancellationToken)
{
await _connection.DisposeAsync();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@
"Contpaqi": "Data Source=AR-SERVER\\COMPAC;User ID=sa;Password=Sdmramos1;Connect Timeout=30;Encrypt=False;"
},
"ApiSyncConfig": {
"BaseAddress": "https://localhost:7082/",
"WaitTime": "00:00:05"
"BaseAddress": "https://localhost:7082/"
},
"ContpaqiComercialConfig": {
"Usuario": "SUPERVISOR",
Expand Down
1 change: 0 additions & 1 deletion src/Api.Sync.Presentation.WorkerService/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
"ApiSyncConfig": {
"SubscriptionKey": "00000000000000000000000000000000",
"BaseAddress": "https://contpaqiapim.azure-api.net/comercial/",
"WaitTime": "00:00:30",
"ShutdownTime": "20:00:00",
"Empresas": [ "URE180429TM6" ]
},
Expand Down