From dc420eb4c2b5da50479bf813363f040579d8b150 Mon Sep 17 00:00:00 2001 From: Timo Notheisen Date: Wed, 18 Dec 2024 17:12:18 +0100 Subject: [PATCH] chore: adapt to new API --- .../InMemoryEventBusSubscriptionsManager.cs | 12 +- .../DefaultRabbitMQPersisterConnection.cs | 112 +++++++++++------- .../EventBus/RabbitMQ/EventBusRabbitMQ.cs | 95 ++++++++------- .../RabbitMQ/IRabbitMQPersisterConnection.cs | 4 +- 4 files changed, 123 insertions(+), 100 deletions(-) diff --git a/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/InMemoryEventBusSubscriptionsManager.cs b/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/InMemoryEventBusSubscriptionsManager.cs index fd424ed882..1c0963bbd5 100644 --- a/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/InMemoryEventBusSubscriptionsManager.cs +++ b/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/InMemoryEventBusSubscriptionsManager.cs @@ -1,3 +1,4 @@ +using System.Collections.Concurrent; using Backbone.BuildingBlocks.Application.Abstractions.Infrastructure.EventBus; using Backbone.BuildingBlocks.Domain.Events; @@ -5,12 +6,7 @@ namespace Backbone.BuildingBlocks.Infrastructure.EventBus; public class InMemoryEventBusSubscriptionsManager : IEventBusSubscriptionsManager { - private readonly Dictionary> _handlers; - - public InMemoryEventBusSubscriptionsManager() - { - _handlers = new Dictionary>(); - } + private readonly ConcurrentDictionary> _handlers = []; public void Clear() { @@ -45,7 +41,7 @@ public string GetEventKey() return GetEventKey(typeof(T)); } - public static string GetEventKey(Type eventType) + private static string GetEventKey(Type eventType) { return eventType.Name; } @@ -54,7 +50,7 @@ private void DoAddSubscription(Type handlerType, Type eventType) { var eventName = GetEventKey(eventType); - if (!HasSubscriptionsForEvent(eventName)) _handlers.Add(eventName, []); + _handlers.TryAdd(eventName, []); if (_handlers[eventName].Any(s => s.HandlerType == handlerType)) throw new ArgumentException( diff --git a/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/RabbitMQ/DefaultRabbitMQPersisterConnection.cs b/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/RabbitMQ/DefaultRabbitMQPersisterConnection.cs index 8100dc5038..18ef17c844 100644 --- a/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/RabbitMQ/DefaultRabbitMQPersisterConnection.cs +++ b/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/RabbitMQ/DefaultRabbitMQPersisterConnection.cs @@ -1,3 +1,4 @@ +using System.Diagnostics; using System.Net.Sockets; using Microsoft.Extensions.Logging; using Polly; @@ -10,12 +11,13 @@ namespace Backbone.BuildingBlocks.Infrastructure.EventBus.RabbitMQ; public class DefaultRabbitMqPersistentConnection : IRabbitMqPersistentConnection { + private readonly SemaphoreSlim _semaphore = new(1); private readonly IConnectionFactory _connectionFactory; private readonly ILogger _logger; private readonly int _retryCount; - private readonly object _syncRoot = new(); private IConnection? _connection; + private IChannel? _channel; private bool _disposed; public DefaultRabbitMqPersistentConnection(IConnectionFactory connectionFactory, @@ -28,81 +30,103 @@ public DefaultRabbitMqPersistentConnection(IConnectionFactory connectionFactory, public bool IsConnected => _connection is { IsOpen: true } && !_disposed; - public IModel CreateModel() + public async Task Connect() { - if (!IsConnected) - throw new InvalidOperationException("No RabbitMQ connections are available to perform this action"); - - return _connection!.CreateModel(); - } - - public void Dispose() - { - if (_disposed) return; + if (IsConnected) + return; - _disposed = true; + await _semaphore.WaitAsync(); try { - _connection?.Dispose(); + if (IsConnected) + return; + + await ConnectInternal(); } - catch (IOException ex) + finally { - _logger.LogCritical(ex, "There was an error while disposing the connection."); + _semaphore.Release(); } } - public bool TryConnect() + private async Task ConnectInternal() { _logger.LogInformation("RabbitMQ Client is trying to connect"); - lock (_syncRoot) + var policy = Policy.Handle() + .Or() + .WaitAndRetryAsync(_retryCount, + retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), + (ex, _) => _logger.ConnectionError(ex)); + + await policy.ExecuteAsync(async () => _connection = await _connectionFactory.CreateConnectionAsync()); + + if (!IsConnected) { - var policy = Policy.Handle() - .Or() - .WaitAndRetry(_retryCount, - retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), - (ex, _) => _logger.ConnectionError(ex)); + _logger.LogCritical("FATAL ERROR: RabbitMQ connections could not be created and opened"); + throw new Exception("RabbitMQ connections could not be created and opened"); + } - policy.Execute(() => _connection = _connectionFactory - .CreateConnection()); + _connection!.ConnectionShutdownAsync += OnConnectionShutdown; + _connection.CallbackExceptionAsync += OnCallbackException; + _connection.ConnectionBlockedAsync += OnConnectionBlocked; - if (!IsConnected) - { - _logger.LogCritical("FATAL ERROR: RabbitMQ connections could not be created and opened"); - return false; - } + _logger.LogInformation("RabbitMQ persistent connection acquired a connection to '{hostName}' and is subscribed to failure events", _connection.Endpoint.HostName); - _connection!.ConnectionShutdown += OnConnectionShutdown; - _connection!.CallbackException += OnCallbackException; - _connection!.ConnectionBlocked += OnConnectionBlocked; + _channel = await _connection.CreateChannelAsync(); - _logger.LogInformation( - "RabbitMQ persistent connection acquired a connection '{hostName}' and is subscribed to failure events", _connection.Endpoint.HostName); + _logger.LogInformation("Created a new channel"); + } - return true; - } + public IChannel GetChannel() + { + Debug.Assert(_channel != null, nameof(_channel) + " != null"); + + return _channel; } - private void OnConnectionBlocked(object? sender, ConnectionBlockedEventArgs e) + public void Dispose() { if (_disposed) return; + + _disposed = true; + + try + { + _connection?.Dispose(); + } + catch (IOException ex) + { + _logger.LogCritical(ex, "There was an error while disposing the connection."); + } + } + + private async Task OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e) + { + if (_disposed) + return; + _logger.ConnectionIsBlocked(); - TryConnect(); + await Connect(); } - private void OnCallbackException(object? sender, CallbackExceptionEventArgs e) + private async Task OnCallbackException(object sender, CallbackExceptionEventArgs e) { - if (_disposed) return; + if (_disposed) + return; + _logger.ConnectionThrewAnException(); - TryConnect(); + await Connect(); } - private void OnConnectionShutdown(object? sender, ShutdownEventArgs reason) + private async Task OnConnectionShutdown(object sender, ShutdownEventArgs reason) { - if (_disposed) return; + if (_disposed) + return; + _logger.ConnectionIsShutdown(); - TryConnect(); + await Connect(); } } diff --git a/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/RabbitMQ/EventBusRabbitMQ.cs b/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/RabbitMQ/EventBusRabbitMQ.cs index e8c4e323a6..1129b1e2a4 100644 --- a/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/RabbitMQ/EventBusRabbitMQ.cs +++ b/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/RabbitMQ/EventBusRabbitMQ.cs @@ -17,7 +17,7 @@ namespace Backbone.BuildingBlocks.Infrastructure.EventBus.RabbitMQ; public class EventBusRabbitMq : IEventBus, IDisposable { - private const string BROKER_NAME = "event_bus"; + private const string EXCHANGE_NAME = "event_bus"; private const string AUTOFAC_SCOPE_NAME = "event_bus"; private readonly ILifetimeScope _autofac; @@ -28,12 +28,12 @@ public class EventBusRabbitMq : IEventBus, IDisposable private readonly HandlerRetryBehavior _handlerRetryBehavior; private readonly IEventBusSubscriptionsManager _subsManager; - private IModel _consumerChannel; - private readonly string? _queueName; - private EventingBasicConsumer? _consumer; + private IChannel? _consumerChannel; + private readonly string _queueName; + private AsyncEventingBasicConsumer? _consumer; public EventBusRabbitMq(IRabbitMqPersistentConnection persistentConnection, ILogger logger, - ILifetimeScope autofac, IEventBusSubscriptionsManager? subsManager, HandlerRetryBehavior handlerRetryBehavior, string? queueName = null, + ILifetimeScope autofac, IEventBusSubscriptionsManager? subsManager, HandlerRetryBehavior handlerRetryBehavior, string queueName, int connectionRetryCount = 5) { _persistentConnection = @@ -41,7 +41,6 @@ public EventBusRabbitMq(IRabbitMqPersistentConnection persistentConnection, ILog _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager(); _queueName = queueName; - _consumerChannel = CreateConsumerChannel(); _autofac = autofac; _connectionRetryCount = connectionRetryCount; _handlerRetryBehavior = handlerRetryBehavior; @@ -49,30 +48,32 @@ public EventBusRabbitMq(IRabbitMqPersistentConnection persistentConnection, ILog public void Dispose() { - _consumerChannel.Dispose(); + _consumerChannel?.Dispose(); _subsManager.Clear(); } - public void StartConsuming() + public async void StartConsuming() { - using var channel = _persistentConnection.CreateModel(); - channel.ExchangeDeclare(BROKER_NAME, "direct"); + _consumerChannel = await CreateConsumerChannel(); + var channel = _persistentConnection.GetChannel(); + await channel.ExchangeDeclareAsync(EXCHANGE_NAME, "direct"); if (_consumer is null) { throw new Exception("Cannot start consuming without a consumer set."); } - _consumerChannel.BasicConsume(_queueName, false, _consumer); + _consumerChannel.BasicConsumeAsync(_queueName, false, _consumer).GetAwaiter().GetResult(); } - public void Publish(DomainEvent @event) + public async void Publish(DomainEvent @event) { - if (!_persistentConnection.IsConnected) _persistentConnection.TryConnect(); + if (!_persistentConnection.IsConnected) + await _persistentConnection.Connect(); var policy = Policy.Handle() .Or() - .WaitAndRetry(_connectionRetryCount, + .WaitAndRetryAsync(_connectionRetryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, _) => _logger.ErrorOnPublish(ex)); @@ -89,18 +90,19 @@ public void Publish(DomainEvent @event) var body = Encoding.UTF8.GetBytes(message); - policy.Execute(() => + await policy.ExecuteAsync(async () => { _logger.LogDebug("Publishing a {EventName} to RabbitMQ.", eventName); - using var channel = _persistentConnection.CreateModel(); - var properties = channel.CreateBasicProperties(); - properties.DeliveryMode = 2; // persistent - properties.MessageId = @event.DomainEventId; - - properties.CorrelationId = CustomLogContext.GetCorrelationId(); + var channel = _persistentConnection.GetChannel(); + var properties = new BasicProperties + { + DeliveryMode = DeliveryModes.Persistent, + MessageId = @event.DomainEventId, + CorrelationId = CustomLogContext.GetCorrelationId() + }; - channel.BasicPublish(BROKER_NAME, + await channel.BasicPublishAsync(EXCHANGE_NAME, eventName, true, properties, @@ -110,19 +112,19 @@ public void Publish(DomainEvent @event) }); } - public void Subscribe() + public async void Subscribe() where T : DomainEvent where TH : IDomainEventHandler { var eventName = _subsManager.GetEventKey(); - DoInternalSubscription(eventName); + await DoInternalSubscription(eventName); _logger.LogInformation("Subscribing to event '{EventName}' with {EventHandler}", eventName, typeof(TH).Name); _subsManager.AddSubscription(); } - private void DoInternalSubscription(string eventName) + private async Task DoInternalSubscription(string eventName) { var containsKey = _subsManager.HasSubscriptionsForEvent(eventName); if (containsKey) @@ -131,37 +133,38 @@ private void DoInternalSubscription(string eventName) return; } - if (!_persistentConnection.IsConnected) _persistentConnection.TryConnect(); + if (!_persistentConnection.IsConnected) + await _persistentConnection.Connect(); _logger.LogTrace("Trying to bind queue '{QueueName}' on RabbitMQ ...", _queueName); - using var channel = _persistentConnection.CreateModel(); - channel.QueueBind(_queueName, - BROKER_NAME, + var channel = _persistentConnection.GetChannel(); + await channel.QueueBindAsync(_queueName, + EXCHANGE_NAME, eventName); _logger.LogTrace("Successfully bound queue '{QueueName}' on RabbitMQ.", _queueName); } - private IModel CreateConsumerChannel() + private async Task CreateConsumerChannel() { - if (!_persistentConnection.IsConnected) _persistentConnection.TryConnect(); + if (!_persistentConnection.IsConnected) + await _persistentConnection.Connect(); _logger.LogInformation("Creating RabbitMQ consumer channel"); - var channel = _persistentConnection.CreateModel(); + var channel = _persistentConnection.GetChannel(); - channel.ExchangeDeclare(BROKER_NAME, - "direct"); + await channel.ExchangeDeclareAsync(EXCHANGE_NAME, "direct"); // TODO: why declare again? This is already up in StartConsuming - channel.QueueDeclare(_queueName, - true, - false, - false, - null); + await channel.QueueDeclareAsync(_queueName, + durable: true, + exclusive: false, + autoDelete: false + ); - _consumer = new EventingBasicConsumer(channel); - _consumer.Received += async (_, eventArgs) => + _consumer = new AsyncEventingBasicConsumer(channel); + _consumer.ReceivedAsync += async (_, eventArgs) => { var eventName = eventArgs.RoutingKey; var message = Encoding.UTF8.GetString(eventArgs.Body.ToArray()); @@ -175,23 +178,23 @@ private IModel CreateConsumerChannel() { await ProcessEvent(eventName, message); - channel.BasicAck(eventArgs.DeliveryTag, false); + await channel.BasicAckAsync(eventArgs.DeliveryTag, false); } } catch (Exception ex) { - channel.BasicReject(eventArgs.DeliveryTag, false); + await channel.BasicRejectAsync(eventArgs.DeliveryTag, false); _logger.ErrorWhileProcessingDomainEvent(eventName, ex); } }; - channel.CallbackException += (_, ea) => + channel.CallbackExceptionAsync += async (_, ea) => { _logger.LogWarning(ea.Exception, "Recreating RabbitMQ consumer channel"); - _consumerChannel.Dispose(); - _consumerChannel = CreateConsumerChannel(); + _consumerChannel?.Dispose(); + _consumerChannel = await CreateConsumerChannel(); }; return channel; diff --git a/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/RabbitMQ/IRabbitMQPersisterConnection.cs b/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/RabbitMQ/IRabbitMQPersisterConnection.cs index 6d4a65a71d..10f90943c3 100644 --- a/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/RabbitMQ/IRabbitMQPersisterConnection.cs +++ b/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/RabbitMQ/IRabbitMQPersisterConnection.cs @@ -7,7 +7,7 @@ public interface IRabbitMqPersistentConnection { bool IsConnected { get; } - bool TryConnect(); + Task Connect(); - IModel CreateModel(); + IChannel GetChannel(); }