Skip to content

Commit

Permalink
refactor: make all event bus methods async
Browse files Browse the repository at this point in the history
  • Loading branch information
tnotheis committed Dec 19, 2024
1 parent dc420eb commit 567cc4a
Show file tree
Hide file tree
Showing 27 changed files with 155 additions and 142 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,18 @@ namespace Backbone.DatabaseMigrator;

public class DummyEventBus : IEventBus
{
public void Publish(DomainEvent @event)
public Task Publish(DomainEvent @event)
{
return Task.CompletedTask;
}

public void StartConsuming()
public Task StartConsuming()
{
return Task.CompletedTask;
}

public void Subscribe<T, TH>() where T : DomainEvent where TH : IDomainEventHandler<T>
public Task Subscribe<T, TH>() where T : DomainEvent where TH : IDomainEventHandler<T>
{
return Task.CompletedTask;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,30 @@ public EventHandlerService(IEventBus eventBus, IEnumerable<AbstractModule> modul
_logger = logger;
}

public Task StartAsync(CancellationToken cancellationToken)
public async Task StartAsync(CancellationToken cancellationToken)
{
SubscribeToEvents();
StartConsuming();

return Task.CompletedTask;
await SubscribeToEvents();
await StartConsuming();
}

public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}

private void SubscribeToEvents()
private async Task SubscribeToEvents()
{
_logger.LogInformation("Subscribing to events...");
foreach (var module in _modules)
{
module.ConfigureEventBus(_eventBus);
await module.ConfigureEventBus(_eventBus);
}

_logger.LogInformation("Successfully subscribed to events.");
}

private void StartConsuming()
private async Task StartConsuming()
{
_eventBus.StartConsuming();
await _eventBus.StartConsuming();
}
}
6 changes: 4 additions & 2 deletions BuildingBlocks/src/BuildingBlocks.API/AbstractModule.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ public abstract class AbstractModule

public abstract void ConfigureServices(IServiceCollection services, IConfigurationSection configuration);

public abstract void ConfigureEventBus(IEventBus eventBus);
public abstract Task ConfigureEventBus(IEventBus eventBus);

public virtual void PostStartupValidation(IServiceProvider serviceProvider) { }
public virtual void PostStartupValidation(IServiceProvider serviceProvider)
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,18 @@ namespace Backbone.BuildingBlocks.Application.Abstractions.Infrastructure.EventB

public interface IEventBus
{
void Publish(IEnumerable<DomainEvent> events)
async Task Publish(IEnumerable<DomainEvent> events)
{
foreach (var domainEvent in events)
{
Publish(domainEvent);
await Publish(domainEvent);
}
}

void Publish(DomainEvent @event);
void StartConsuming();
Task Publish(DomainEvent @event);
Task StartConsuming();

void Subscribe<T, TH>()
Task Subscribe<T, TH>()
where T : DomainEvent
where TH : IDomainEventHandler<T>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public async ValueTask DisposeAsync()
await _processor.CloseAsync();
}

public async void Publish(DomainEvent @event)
public async Task Publish(DomainEvent @event)
{
var eventName = @event.GetType().Name.Replace(DOMAIN_EVENT_SUFFIX, "");
var jsonMessage = JsonConvert.SerializeObject(@event, new JsonSerializerSettings
Expand All @@ -78,7 +78,7 @@ await _logger.TraceTime(async () =>
_logger.LogDebug("Successfully sent domain event with id '{MessageId}'.", message.MessageId);
}

public void Subscribe<T, TH>()
public async Task Subscribe<T, TH>()
where T : DomainEvent
where TH : IDomainEventHandler<T>
{
Expand All @@ -90,12 +90,12 @@ public void Subscribe<T, TH>()
{
_logger.LogInformation("Trying to create subscription on Service Bus...");

_serviceBusPersisterConnection.AdministrationClient.CreateRuleAsync(TOPIC_NAME, _subscriptionName,
await _serviceBusPersisterConnection.AdministrationClient.CreateRuleAsync(TOPIC_NAME, _subscriptionName,
new CreateRuleOptions
{
Filter = new CorrelationRuleFilter { Subject = eventName },
Name = eventName
}).GetAwaiter().GetResult();
});

_logger.LogInformation("Successfully created subscription on Service Bus.");
}
Expand All @@ -109,9 +109,9 @@ public void Subscribe<T, TH>()
_subscriptionManager.AddSubscription<T, TH>();
}

public void StartConsuming()
public async Task StartConsuming()
{
RegisterSubscriptionClientMessageHandlerAsync().GetAwaiter().GetResult();
await RegisterSubscriptionClientMessageHandlerAsync();
}

private async Task RegisterSubscriptionClientMessageHandlerAsync()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public async ValueTask DisposeAsync()
await _connection.DisposeAsync();
}

public async void Publish(DomainEvent @event)
public async Task Publish(DomainEvent @event)
{
var eventName = @event.GetType().Name.Replace(DOMAIN_EVENT_SUFFIX, "");

Expand All @@ -77,7 +77,7 @@ public async void Publish(DomainEvent @event)
_logger.EventWasNotProcessed(messageId);
}

public void Subscribe<T, TH>()
public Task Subscribe<T, TH>()
where T : DomainEvent
where TH : IDomainEventHandler<T>
{
Expand All @@ -86,11 +86,13 @@ public void Subscribe<T, TH>()
_logger.LogInformation("Subscribing to event '{EventName}' with {EventHandler}", eventName, typeof(TH).Name);

_subscriptionManager.AddSubscription<T, TH>();

return Task.CompletedTask;
}

public void StartConsuming()
public async Task StartConsuming()
{
_connection.SubscriberClient.StartAsync(OnIncomingEvent);
await _connection.SubscriberClient.StartAsync(OnIncomingEvent);
}

private static string RemoveDomainEventSuffix(string typeName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ public class DefaultRabbitMqPersistentConnection
private readonly int _retryCount;

private IConnection? _connection;
private IChannel? _channel;
private bool _disposed;

public DefaultRabbitMqPersistentConnection(IConnectionFactory connectionFactory,
Expand Down Expand Up @@ -74,16 +73,16 @@ private async Task ConnectInternal()

_logger.LogInformation("RabbitMQ persistent connection acquired a connection to '{hostName}' and is subscribed to failure events", _connection.Endpoint.HostName);

_channel = await _connection.CreateChannelAsync();

_logger.LogInformation("Created a new channel");
}

public IChannel GetChannel()
public async Task<IChannel> CreateChannel()
{
Debug.Assert(_channel != null, nameof(_channel) + " != null");
Debug.Assert(IsConnected, "RabbitMQ connection is not established");

var channel = await _connection!.CreateChannelAsync();

return _channel;
return channel;
}

public void Dispose()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,21 +52,19 @@ public void Dispose()
_subsManager.Clear();
}

public async void StartConsuming()
public async Task StartConsuming()
{
_consumerChannel = await CreateConsumerChannel();
var channel = _persistentConnection.GetChannel();
await channel.ExchangeDeclareAsync(EXCHANGE_NAME, "direct");
await _consumerChannel!.ExchangeDeclareAsync(EXCHANGE_NAME, "direct");

if (_consumer is null)
{
throw new Exception("Cannot start consuming without a consumer set.");
}

_consumerChannel.BasicConsumeAsync(_queueName, false, _consumer).GetAwaiter().GetResult();
await _consumerChannel!.BasicConsumeAsync(_queueName, false, _consumer);
}

public async void Publish(DomainEvent @event)
public async Task Publish(DomainEvent @event)
{
if (!_persistentConnection.IsConnected)
await _persistentConnection.Connect();
Expand Down Expand Up @@ -94,7 +92,7 @@ await policy.ExecuteAsync(async () =>
{
_logger.LogDebug("Publishing a {EventName} to RabbitMQ.", eventName);

var channel = _persistentConnection.GetChannel();
await using var channel = await _persistentConnection.CreateChannel();
var properties = new BasicProperties
{
DeliveryMode = DeliveryModes.Persistent,
Expand All @@ -112,10 +110,12 @@ await channel.BasicPublishAsync(EXCHANGE_NAME,
});
}

public async void Subscribe<T, TH>()
public async Task Subscribe<T, TH>()
where T : DomainEvent
where TH : IDomainEventHandler<T>
{
await EnsureConsumerChannelExists();

var eventName = _subsManager.GetEventKey<T>();
await DoInternalSubscription(eventName);

Expand All @@ -124,6 +124,14 @@ public async void Subscribe<T, TH>()
_subsManager.AddSubscription<T, TH>();
}

private async Task EnsureConsumerChannelExists()
{
if (_consumerChannel is null)
{
await CreateConsumerChannel();
}
}

private async Task DoInternalSubscription(string eventName)
{
var containsKey = _subsManager.HasSubscriptionsForEvent(eventName);
Expand All @@ -138,32 +146,31 @@ private async Task DoInternalSubscription(string eventName)

_logger.LogTrace("Trying to bind queue '{QueueName}' on RabbitMQ ...", _queueName);

var channel = _persistentConnection.GetChannel();
await channel.QueueBindAsync(_queueName,
await _consumerChannel!.QueueBindAsync(_queueName,
EXCHANGE_NAME,
eventName);

_logger.LogTrace("Successfully bound queue '{QueueName}' on RabbitMQ.", _queueName);
}

private async Task<IChannel> CreateConsumerChannel()
private async Task CreateConsumerChannel()
{
if (!_persistentConnection.IsConnected)
await _persistentConnection.Connect();

_logger.LogInformation("Creating RabbitMQ consumer channel");

var channel = _persistentConnection.GetChannel();
_consumerChannel = await _persistentConnection.CreateChannel();

await channel.ExchangeDeclareAsync(EXCHANGE_NAME, "direct"); // TODO: why declare again? This is already up in StartConsuming
await _consumerChannel.ExchangeDeclareAsync(EXCHANGE_NAME, "direct"); // TODO: why declare again? This is already up in StartConsuming

await channel.QueueDeclareAsync(_queueName,
await _consumerChannel.QueueDeclareAsync(_queueName,
durable: true,
exclusive: false,
autoDelete: false
);

_consumer = new AsyncEventingBasicConsumer(channel);
_consumer = new AsyncEventingBasicConsumer(_consumerChannel);
_consumer.ReceivedAsync += async (_, eventArgs) =>
{
var eventName = eventArgs.RoutingKey;
Expand All @@ -178,26 +185,24 @@ await channel.QueueDeclareAsync(_queueName,
{
await ProcessEvent(eventName, message);

await channel.BasicAckAsync(eventArgs.DeliveryTag, false);
await _consumerChannel.BasicAckAsync(eventArgs.DeliveryTag, false);
}
}
catch (Exception ex)
{
await channel.BasicRejectAsync(eventArgs.DeliveryTag, false);
await _consumerChannel.BasicRejectAsync(eventArgs.DeliveryTag, false);

_logger.ErrorWhileProcessingDomainEvent(eventName, ex);
}
};

channel.CallbackExceptionAsync += async (_, ea) =>
_consumerChannel.CallbackExceptionAsync += async (_, ea) =>
{
_logger.LogWarning(ea.Exception, "Recreating RabbitMQ consumer channel");

_consumerChannel?.Dispose();
_consumerChannel = await CreateConsumerChannel();
await CreateConsumerChannel();
};

return channel;
}

private async Task ProcessEvent(string eventName, string message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ public interface IRabbitMqPersistentConnection

Task Connect();

IChannel GetChannel();
Task<IChannel> CreateChannel();
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ private void PublishDomainEvents(List<Entity> entities)
{
foreach (var e in entities)
{
_eventBus.Publish(e.DomainEvents);
_ = _eventBus.Publish(e.DomainEvents);
e.ClearDomainEvents();
}
}
Expand Down
Loading

0 comments on commit 567cc4a

Please sign in to comment.