Skip to content

Commit

Permalink
chore: adapt to new API
Browse files Browse the repository at this point in the history
  • Loading branch information
tnotheis committed Dec 18, 2024
1 parent eec78b0 commit dc420eb
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 100 deletions.
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
using System.Collections.Concurrent;
using Backbone.BuildingBlocks.Application.Abstractions.Infrastructure.EventBus;
using Backbone.BuildingBlocks.Domain.Events;

namespace Backbone.BuildingBlocks.Infrastructure.EventBus;

public class InMemoryEventBusSubscriptionsManager : IEventBusSubscriptionsManager
{
private readonly Dictionary<string, List<SubscriptionInfo>> _handlers;

public InMemoryEventBusSubscriptionsManager()
{
_handlers = new Dictionary<string, List<SubscriptionInfo>>();
}
private readonly ConcurrentDictionary<string, List<SubscriptionInfo>> _handlers = [];

public void Clear()
{
Expand Down Expand Up @@ -45,7 +41,7 @@ public string GetEventKey<T>()
return GetEventKey(typeof(T));
}

public static string GetEventKey(Type eventType)
private static string GetEventKey(Type eventType)
{
return eventType.Name;
}
Expand All @@ -54,7 +50,7 @@ private void DoAddSubscription(Type handlerType, Type eventType)
{
var eventName = GetEventKey(eventType);

if (!HasSubscriptionsForEvent(eventName)) _handlers.Add(eventName, []);
_handlers.TryAdd(eventName, []);

if (_handlers[eventName].Any(s => s.HandlerType == handlerType))
throw new ArgumentException(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Diagnostics;
using System.Net.Sockets;
using Microsoft.Extensions.Logging;
using Polly;
Expand All @@ -10,12 +11,13 @@ namespace Backbone.BuildingBlocks.Infrastructure.EventBus.RabbitMQ;
public class DefaultRabbitMqPersistentConnection
: IRabbitMqPersistentConnection
{
private readonly SemaphoreSlim _semaphore = new(1);
private readonly IConnectionFactory _connectionFactory;
private readonly ILogger<DefaultRabbitMqPersistentConnection> _logger;
private readonly int _retryCount;

private readonly object _syncRoot = new();
private IConnection? _connection;
private IChannel? _channel;
private bool _disposed;

public DefaultRabbitMqPersistentConnection(IConnectionFactory connectionFactory,
Expand All @@ -28,81 +30,103 @@ public DefaultRabbitMqPersistentConnection(IConnectionFactory connectionFactory,

public bool IsConnected => _connection is { IsOpen: true } && !_disposed;

public IModel CreateModel()
public async Task Connect()
{
if (!IsConnected)
throw new InvalidOperationException("No RabbitMQ connections are available to perform this action");

return _connection!.CreateModel();
}

public void Dispose()
{
if (_disposed) return;
if (IsConnected)
return;

_disposed = true;
await _semaphore.WaitAsync();

try
{
_connection?.Dispose();
if (IsConnected)
return;

await ConnectInternal();
}
catch (IOException ex)
finally
{
_logger.LogCritical(ex, "There was an error while disposing the connection.");
_semaphore.Release();
}
}

public bool TryConnect()
private async Task ConnectInternal()
{
_logger.LogInformation("RabbitMQ Client is trying to connect");

lock (_syncRoot)
var policy = Policy.Handle<SocketException>()
.Or<BrokerUnreachableException>()
.WaitAndRetryAsync(_retryCount,
retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
(ex, _) => _logger.ConnectionError(ex));

await policy.ExecuteAsync(async () => _connection = await _connectionFactory.CreateConnectionAsync());

if (!IsConnected)
{
var policy = Policy.Handle<SocketException>()
.Or<BrokerUnreachableException>()
.WaitAndRetry(_retryCount,
retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
(ex, _) => _logger.ConnectionError(ex));
_logger.LogCritical("FATAL ERROR: RabbitMQ connections could not be created and opened");
throw new Exception("RabbitMQ connections could not be created and opened");
}

policy.Execute(() => _connection = _connectionFactory
.CreateConnection());
_connection!.ConnectionShutdownAsync += OnConnectionShutdown;
_connection.CallbackExceptionAsync += OnCallbackException;
_connection.ConnectionBlockedAsync += OnConnectionBlocked;

if (!IsConnected)
{
_logger.LogCritical("FATAL ERROR: RabbitMQ connections could not be created and opened");
return false;
}
_logger.LogInformation("RabbitMQ persistent connection acquired a connection to '{hostName}' and is subscribed to failure events", _connection.Endpoint.HostName);

_connection!.ConnectionShutdown += OnConnectionShutdown;
_connection!.CallbackException += OnCallbackException;
_connection!.ConnectionBlocked += OnConnectionBlocked;
_channel = await _connection.CreateChannelAsync();

_logger.LogInformation(
"RabbitMQ persistent connection acquired a connection '{hostName}' and is subscribed to failure events", _connection.Endpoint.HostName);
_logger.LogInformation("Created a new channel");
}

return true;
}
public IChannel GetChannel()
{
Debug.Assert(_channel != null, nameof(_channel) + " != null");

return _channel;
}

private void OnConnectionBlocked(object? sender, ConnectionBlockedEventArgs e)
public void Dispose()
{
if (_disposed) return;

_disposed = true;

try
{
_connection?.Dispose();
}
catch (IOException ex)
{
_logger.LogCritical(ex, "There was an error while disposing the connection.");
}
}

private async Task OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e)
{
if (_disposed)
return;

_logger.ConnectionIsBlocked();
TryConnect();
await Connect();
}

private void OnCallbackException(object? sender, CallbackExceptionEventArgs e)
private async Task OnCallbackException(object sender, CallbackExceptionEventArgs e)
{
if (_disposed) return;
if (_disposed)
return;

_logger.ConnectionThrewAnException();
TryConnect();
await Connect();
}

private void OnConnectionShutdown(object? sender, ShutdownEventArgs reason)
private async Task OnConnectionShutdown(object sender, ShutdownEventArgs reason)
{
if (_disposed) return;
if (_disposed)
return;

_logger.ConnectionIsShutdown();
TryConnect();
await Connect();
}
}

Expand Down
Loading

0 comments on commit dc420eb

Please sign in to comment.