-
Notifications
You must be signed in to change notification settings - Fork 266
/
TxBehavior.cs
78 lines (66 loc) · 3.7 KB
/
TxBehavior.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
using System;
using System.Data;
using System.Diagnostics;
using System.Linq;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using MediatR;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using N8T.Core.Domain;
namespace N8T.Infrastructure.EfCore
{
//[DebuggerStepThrough]
public class TxBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>
where TRequest : notnull, IRequest<TResponse>
where TResponse : notnull
{
private readonly IDomainEventContext _domainEventContext;
private readonly IDbFacadeResolver _dbFacadeResolver;
private readonly IMediator _mediator;
private readonly ILogger<TxBehavior<TRequest, TResponse>> _logger;
public TxBehavior(IDbFacadeResolver dbFacadeResolver, IDomainEventContext domainEventContext,
IMediator mediator, ILogger<TxBehavior<TRequest, TResponse>> logger)
{
_domainEventContext = domainEventContext ?? throw new ArgumentNullException(nameof(domainEventContext));
_dbFacadeResolver = dbFacadeResolver ?? throw new ArgumentNullException(nameof(dbFacadeResolver));
_mediator = mediator ?? throw new ArgumentNullException(nameof(mediator));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task<TResponse> Handle(TRequest request, CancellationToken cancellationToken,
RequestHandlerDelegate<TResponse> next)
{
if (request is not ITxRequest)
{
return await next();
}
_logger.LogInformation("{Prefix} Handled command {MediatRRequest}", nameof(TxBehavior<TRequest, TResponse>), typeof(TRequest).FullName);
_logger.LogDebug("{Prefix} Handled command {MediatRRequest} with content {RequestContent}", nameof(TxBehavior<TRequest, TResponse>), typeof(TRequest).FullName, JsonSerializer.Serialize(request));
_logger.LogInformation("{Prefix} Open the transaction for {MediatRRequest}", nameof(TxBehavior<TRequest, TResponse>), typeof(TRequest).FullName);
var strategy = _dbFacadeResolver.Database.CreateExecutionStrategy();
return await strategy.ExecuteAsync(async () =>
{
// Achieving atomicity
await using var transaction = await _dbFacadeResolver.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted, cancellationToken);
var response = await next();
_logger.LogInformation("{Prefix} Executed the {MediatRRequest} request", nameof(TxBehavior<TRequest, TResponse>), typeof(TRequest).FullName);
await transaction.CommitAsync(cancellationToken);
var domainEvents = _domainEventContext.GetDomainEvents().ToList();
_logger.LogInformation("{Prefix} Published domain events for {MediatRRequest}", nameof(TxBehavior<TRequest, TResponse>), typeof(TRequest).FullName);
var tasks = domainEvents
.Select(async @event =>
{
// because we have int identity
//var id = (response as dynamic)?.Id;
//@event.MetaData.Add("id", id);
await _mediator.Publish(new EventWrapper(@event), cancellationToken);
_logger.LogDebug(
"{Prefix} Published domain event {DomainEventName} with payload {DomainEventContent}", nameof(TxBehavior<TRequest, TResponse>), @event.GetType().FullName, JsonSerializer.Serialize(@event));
});
await Task.WhenAll(tasks);
return response;
});
}
}
}