diff --git a/Applications/DatabaseMigrator/src/DatabaseMigrator/DummyEventBus.cs b/Applications/DatabaseMigrator/src/DatabaseMigrator/DummyEventBus.cs index cc9e479793..f64bd5a778 100644 --- a/Applications/DatabaseMigrator/src/DatabaseMigrator/DummyEventBus.cs +++ b/Applications/DatabaseMigrator/src/DatabaseMigrator/DummyEventBus.cs @@ -5,15 +5,18 @@ namespace Backbone.DatabaseMigrator; public class DummyEventBus : IEventBus { - public void Publish(DomainEvent @event) + public Task Publish(DomainEvent @event) { + return Task.CompletedTask; } - public void StartConsuming() + public Task StartConsuming() { + return Task.CompletedTask; } - public void Subscribe() where T : DomainEvent where TH : IDomainEventHandler + public Task Subscribe() where T : DomainEvent where TH : IDomainEventHandler { + return Task.CompletedTask; } } diff --git a/Applications/EventHandlerService/src/EventHandlerService/EventHandlerService.cs b/Applications/EventHandlerService/src/EventHandlerService/EventHandlerService.cs index f0574be97a..1cd8f44fce 100644 --- a/Applications/EventHandlerService/src/EventHandlerService/EventHandlerService.cs +++ b/Applications/EventHandlerService/src/EventHandlerService/EventHandlerService.cs @@ -16,12 +16,10 @@ public EventHandlerService(IEventBus eventBus, IEnumerable modul _logger = logger; } - public Task StartAsync(CancellationToken cancellationToken) + public async Task StartAsync(CancellationToken cancellationToken) { - SubscribeToEvents(); - StartConsuming(); - - return Task.CompletedTask; + await SubscribeToEvents(); + await StartConsuming(); } public Task StopAsync(CancellationToken cancellationToken) @@ -29,19 +27,19 @@ public Task StopAsync(CancellationToken cancellationToken) return Task.CompletedTask; } - private void SubscribeToEvents() + private async Task SubscribeToEvents() { _logger.LogInformation("Subscribing to events..."); foreach (var module in _modules) { - module.ConfigureEventBus(_eventBus); + await module.ConfigureEventBus(_eventBus); } _logger.LogInformation("Successfully subscribed to events."); } - private void StartConsuming() + private async Task StartConsuming() { - _eventBus.StartConsuming(); + await _eventBus.StartConsuming(); } } diff --git a/BuildingBlocks/src/BuildingBlocks.API/AbstractModule.cs b/BuildingBlocks/src/BuildingBlocks.API/AbstractModule.cs index c261f58d14..9212e235ea 100644 --- a/BuildingBlocks/src/BuildingBlocks.API/AbstractModule.cs +++ b/BuildingBlocks/src/BuildingBlocks.API/AbstractModule.cs @@ -10,7 +10,9 @@ public abstract class AbstractModule public abstract void ConfigureServices(IServiceCollection services, IConfigurationSection configuration); - public abstract void ConfigureEventBus(IEventBus eventBus); + public abstract Task ConfigureEventBus(IEventBus eventBus); - public virtual void PostStartupValidation(IServiceProvider serviceProvider) { } + public virtual void PostStartupValidation(IServiceProvider serviceProvider) + { + } } diff --git a/BuildingBlocks/src/BuildingBlocks.Application.Abstractions/Infrastructure/EventBus/IEventBus.cs b/BuildingBlocks/src/BuildingBlocks.Application.Abstractions/Infrastructure/EventBus/IEventBus.cs index de3fb18467..94831b71a4 100644 --- a/BuildingBlocks/src/BuildingBlocks.Application.Abstractions/Infrastructure/EventBus/IEventBus.cs +++ b/BuildingBlocks/src/BuildingBlocks.Application.Abstractions/Infrastructure/EventBus/IEventBus.cs @@ -4,18 +4,18 @@ namespace Backbone.BuildingBlocks.Application.Abstractions.Infrastructure.EventB public interface IEventBus { - void Publish(IEnumerable events) + async Task Publish(IEnumerable events) { foreach (var domainEvent in events) { - Publish(domainEvent); + await Publish(domainEvent); } } - void Publish(DomainEvent @event); - void StartConsuming(); + Task Publish(DomainEvent @event); + Task StartConsuming(); - void Subscribe() + Task Subscribe() where T : DomainEvent where TH : IDomainEventHandler; } diff --git a/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/AzureServiceBus/EventBusAzureServiceBus.cs b/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/AzureServiceBus/EventBusAzureServiceBus.cs index fb0e4dcaad..ff59554cb2 100644 --- a/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/AzureServiceBus/EventBusAzureServiceBus.cs +++ b/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/AzureServiceBus/EventBusAzureServiceBus.cs @@ -53,7 +53,7 @@ public async ValueTask DisposeAsync() await _processor.CloseAsync(); } - public async void Publish(DomainEvent @event) + public async Task Publish(DomainEvent @event) { var eventName = @event.GetType().Name.Replace(DOMAIN_EVENT_SUFFIX, ""); var jsonMessage = JsonConvert.SerializeObject(@event, new JsonSerializerSettings @@ -78,7 +78,7 @@ await _logger.TraceTime(async () => _logger.LogDebug("Successfully sent domain event with id '{MessageId}'.", message.MessageId); } - public void Subscribe() + public async Task Subscribe() where T : DomainEvent where TH : IDomainEventHandler { @@ -90,12 +90,12 @@ public void Subscribe() { _logger.LogInformation("Trying to create subscription on Service Bus..."); - _serviceBusPersisterConnection.AdministrationClient.CreateRuleAsync(TOPIC_NAME, _subscriptionName, + await _serviceBusPersisterConnection.AdministrationClient.CreateRuleAsync(TOPIC_NAME, _subscriptionName, new CreateRuleOptions { Filter = new CorrelationRuleFilter { Subject = eventName }, Name = eventName - }).GetAwaiter().GetResult(); + }); _logger.LogInformation("Successfully created subscription on Service Bus."); } @@ -109,9 +109,9 @@ public void Subscribe() _subscriptionManager.AddSubscription(); } - public void StartConsuming() + public async Task StartConsuming() { - RegisterSubscriptionClientMessageHandlerAsync().GetAwaiter().GetResult(); + await RegisterSubscriptionClientMessageHandlerAsync(); } private async Task RegisterSubscriptionClientMessageHandlerAsync() diff --git a/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/GoogleCloudPubSub/EventBusGoogleCloudPubSub.cs b/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/GoogleCloudPubSub/EventBusGoogleCloudPubSub.cs index f25bf037a5..16c1f54246 100644 --- a/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/GoogleCloudPubSub/EventBusGoogleCloudPubSub.cs +++ b/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/GoogleCloudPubSub/EventBusGoogleCloudPubSub.cs @@ -52,7 +52,7 @@ public async ValueTask DisposeAsync() await _connection.DisposeAsync(); } - public async void Publish(DomainEvent @event) + public async Task Publish(DomainEvent @event) { var eventName = @event.GetType().Name.Replace(DOMAIN_EVENT_SUFFIX, ""); @@ -77,7 +77,7 @@ public async void Publish(DomainEvent @event) _logger.EventWasNotProcessed(messageId); } - public void Subscribe() + public Task Subscribe() where T : DomainEvent where TH : IDomainEventHandler { @@ -86,11 +86,13 @@ public void Subscribe() _logger.LogInformation("Subscribing to event '{EventName}' with {EventHandler}", eventName, typeof(TH).Name); _subscriptionManager.AddSubscription(); + + return Task.CompletedTask; } - public void StartConsuming() + public async Task StartConsuming() { - _connection.SubscriberClient.StartAsync(OnIncomingEvent); + await _connection.SubscriberClient.StartAsync(OnIncomingEvent); } private static string RemoveDomainEventSuffix(string typeName) diff --git a/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/RabbitMQ/DefaultRabbitMQPersisterConnection.cs b/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/RabbitMQ/DefaultRabbitMQPersisterConnection.cs index 18ef17c844..d28aafdd82 100644 --- a/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/RabbitMQ/DefaultRabbitMQPersisterConnection.cs +++ b/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/RabbitMQ/DefaultRabbitMQPersisterConnection.cs @@ -17,7 +17,6 @@ public class DefaultRabbitMqPersistentConnection private readonly int _retryCount; private IConnection? _connection; - private IChannel? _channel; private bool _disposed; public DefaultRabbitMqPersistentConnection(IConnectionFactory connectionFactory, @@ -74,16 +73,16 @@ private async Task ConnectInternal() _logger.LogInformation("RabbitMQ persistent connection acquired a connection to '{hostName}' and is subscribed to failure events", _connection.Endpoint.HostName); - _channel = await _connection.CreateChannelAsync(); - _logger.LogInformation("Created a new channel"); } - public IChannel GetChannel() + public async Task CreateChannel() { - Debug.Assert(_channel != null, nameof(_channel) + " != null"); + Debug.Assert(IsConnected, "RabbitMQ connection is not established"); + + var channel = await _connection!.CreateChannelAsync(); - return _channel; + return channel; } public void Dispose() diff --git a/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/RabbitMQ/EventBusRabbitMQ.cs b/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/RabbitMQ/EventBusRabbitMQ.cs index 1129b1e2a4..08c2c3f37d 100644 --- a/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/RabbitMQ/EventBusRabbitMQ.cs +++ b/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/RabbitMQ/EventBusRabbitMQ.cs @@ -52,21 +52,19 @@ public void Dispose() _subsManager.Clear(); } - public async void StartConsuming() + public async Task StartConsuming() { - _consumerChannel = await CreateConsumerChannel(); - var channel = _persistentConnection.GetChannel(); - await channel.ExchangeDeclareAsync(EXCHANGE_NAME, "direct"); + await _consumerChannel!.ExchangeDeclareAsync(EXCHANGE_NAME, "direct"); if (_consumer is null) { throw new Exception("Cannot start consuming without a consumer set."); } - _consumerChannel.BasicConsumeAsync(_queueName, false, _consumer).GetAwaiter().GetResult(); + await _consumerChannel!.BasicConsumeAsync(_queueName, false, _consumer); } - public async void Publish(DomainEvent @event) + public async Task Publish(DomainEvent @event) { if (!_persistentConnection.IsConnected) await _persistentConnection.Connect(); @@ -94,7 +92,7 @@ await policy.ExecuteAsync(async () => { _logger.LogDebug("Publishing a {EventName} to RabbitMQ.", eventName); - var channel = _persistentConnection.GetChannel(); + await using var channel = await _persistentConnection.CreateChannel(); var properties = new BasicProperties { DeliveryMode = DeliveryModes.Persistent, @@ -112,10 +110,12 @@ await channel.BasicPublishAsync(EXCHANGE_NAME, }); } - public async void Subscribe() + public async Task Subscribe() where T : DomainEvent where TH : IDomainEventHandler { + await EnsureConsumerChannelExists(); + var eventName = _subsManager.GetEventKey(); await DoInternalSubscription(eventName); @@ -124,6 +124,14 @@ public async void Subscribe() _subsManager.AddSubscription(); } + private async Task EnsureConsumerChannelExists() + { + if (_consumerChannel is null) + { + await CreateConsumerChannel(); + } + } + private async Task DoInternalSubscription(string eventName) { var containsKey = _subsManager.HasSubscriptionsForEvent(eventName); @@ -138,32 +146,31 @@ private async Task DoInternalSubscription(string eventName) _logger.LogTrace("Trying to bind queue '{QueueName}' on RabbitMQ ...", _queueName); - var channel = _persistentConnection.GetChannel(); - await channel.QueueBindAsync(_queueName, + await _consumerChannel!.QueueBindAsync(_queueName, EXCHANGE_NAME, eventName); _logger.LogTrace("Successfully bound queue '{QueueName}' on RabbitMQ.", _queueName); } - private async Task CreateConsumerChannel() + private async Task CreateConsumerChannel() { if (!_persistentConnection.IsConnected) await _persistentConnection.Connect(); _logger.LogInformation("Creating RabbitMQ consumer channel"); - var channel = _persistentConnection.GetChannel(); + _consumerChannel = await _persistentConnection.CreateChannel(); - await channel.ExchangeDeclareAsync(EXCHANGE_NAME, "direct"); // TODO: why declare again? This is already up in StartConsuming + await _consumerChannel.ExchangeDeclareAsync(EXCHANGE_NAME, "direct"); // TODO: why declare again? This is already up in StartConsuming - await channel.QueueDeclareAsync(_queueName, + await _consumerChannel.QueueDeclareAsync(_queueName, durable: true, exclusive: false, autoDelete: false ); - _consumer = new AsyncEventingBasicConsumer(channel); + _consumer = new AsyncEventingBasicConsumer(_consumerChannel); _consumer.ReceivedAsync += async (_, eventArgs) => { var eventName = eventArgs.RoutingKey; @@ -178,26 +185,24 @@ await channel.QueueDeclareAsync(_queueName, { await ProcessEvent(eventName, message); - await channel.BasicAckAsync(eventArgs.DeliveryTag, false); + await _consumerChannel.BasicAckAsync(eventArgs.DeliveryTag, false); } } catch (Exception ex) { - await channel.BasicRejectAsync(eventArgs.DeliveryTag, false); + await _consumerChannel.BasicRejectAsync(eventArgs.DeliveryTag, false); _logger.ErrorWhileProcessingDomainEvent(eventName, ex); } }; - channel.CallbackExceptionAsync += async (_, ea) => + _consumerChannel.CallbackExceptionAsync += async (_, ea) => { _logger.LogWarning(ea.Exception, "Recreating RabbitMQ consumer channel"); _consumerChannel?.Dispose(); - _consumerChannel = await CreateConsumerChannel(); + await CreateConsumerChannel(); }; - - return channel; } private async Task ProcessEvent(string eventName, string message) diff --git a/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/RabbitMQ/IRabbitMQPersisterConnection.cs b/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/RabbitMQ/IRabbitMQPersisterConnection.cs index 10f90943c3..abca43b063 100644 --- a/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/RabbitMQ/IRabbitMQPersisterConnection.cs +++ b/BuildingBlocks/src/BuildingBlocks.Infrastructure/EventBus/RabbitMQ/IRabbitMQPersisterConnection.cs @@ -9,5 +9,5 @@ public interface IRabbitMqPersistentConnection Task Connect(); - IChannel GetChannel(); + Task CreateChannel(); } diff --git a/BuildingBlocks/src/BuildingBlocks.Infrastructure/Persistence/Database/AbstractDbContextBase.cs b/BuildingBlocks/src/BuildingBlocks.Infrastructure/Persistence/Database/AbstractDbContextBase.cs index fe83439b6b..92193f4594 100644 --- a/BuildingBlocks/src/BuildingBlocks.Infrastructure/Persistence/Database/AbstractDbContextBase.cs +++ b/BuildingBlocks/src/BuildingBlocks.Infrastructure/Persistence/Database/AbstractDbContextBase.cs @@ -157,7 +157,7 @@ private void PublishDomainEvents(List entities) { foreach (var e in entities) { - _eventBus.Publish(e.DomainEvents); + _ = _eventBus.Publish(e.DomainEvents); e.ClearDomainEvents(); } } diff --git a/BuildingBlocks/test/BuildingBlocks.Infrastructure.Tests/EventBus/GoogleCloudPubSub/GoogleCloudPubSubTests.cs b/BuildingBlocks/test/BuildingBlocks.Infrastructure.Tests/EventBus/GoogleCloudPubSub/GoogleCloudPubSubTests.cs index cff720eb89..4c5695eb92 100644 --- a/BuildingBlocks/test/BuildingBlocks.Infrastructure.Tests/EventBus/GoogleCloudPubSub/GoogleCloudPubSubTests.cs +++ b/BuildingBlocks/test/BuildingBlocks.Infrastructure.Tests/EventBus/GoogleCloudPubSub/GoogleCloudPubSubTests.cs @@ -32,9 +32,9 @@ public async Task One_subscriber_for_one_event() var subscriber = _factory.CreateEventBus(); var publisher = _factory.CreateEventBus(); - subscriber.Subscribe(); + await subscriber.Subscribe(); - publisher.Publish(new TestEvent1DomainEvent()); + await publisher.Publish(new TestEvent1DomainEvent()); TestEvent1DomainEventHandler1.ShouldEventuallyHaveOneTriggeredInstance(); } @@ -47,10 +47,10 @@ public async Task Subscribe_to_the_same_event_twice_with_the_same_subscriber() var subscriber = _factory.CreateEventBus(); var publisher = _factory.CreateEventBus(); - subscriber.Subscribe(); - subscriber.Subscribe(); + await subscriber.Subscribe(); + await subscriber.Subscribe(); - publisher.Publish(new TestEvent1DomainEvent()); + await publisher.Publish(new TestEvent1DomainEvent()); TestEvent1DomainEventHandler1.ShouldEventuallyHaveOneTriggeredInstance(); TestEvent1DomainEventHandler2.ShouldEventuallyHaveOneTriggeredInstance(); @@ -65,10 +65,10 @@ public async Task Two_subscribers_for_the_same_event_both_receive_the_event() var subscriber2 = _factory.CreateEventBus("subscription2"); var publisher = _factory.CreateEventBus(); - subscriber1.Subscribe(); - subscriber2.Subscribe(); + await subscriber1.Subscribe(); + await subscriber2.Subscribe(); - publisher.Publish(new TestEvent1DomainEvent()); + await publisher.Publish(new TestEvent1DomainEvent()); TestEvent1DomainEventHandler1.ShouldEventuallyHaveOneTriggeredInstance(); TestEvent1DomainEventHandler2.ShouldEventuallyHaveOneTriggeredInstance(); @@ -83,10 +83,10 @@ public async Task Only_one_instance_of_a_subscriber_receives_the_event() var subscriber1B = _factory.CreateEventBus("subscription1"); var publisher = _factory.CreateEventBus(); - subscriber1A.Subscribe(); - subscriber1B.Subscribe(); + await subscriber1A.Subscribe(); + await subscriber1B.Subscribe(); - publisher.Publish(new TestEvent1DomainEvent()); + await publisher.Publish(new TestEvent1DomainEvent()); await Task.Delay(5.Seconds()); // wait some time to make sure all subscribers were notified @@ -109,10 +109,10 @@ public async Task The_correct_event_handler_is_called_when_multiple_subscription var subscriber1 = _factory.CreateEventBus(); var publisher = _factory.CreateEventBus(); - subscriber1.Subscribe(); - subscriber1.Subscribe(); + await subscriber1.Subscribe(); + await subscriber1.Subscribe(); - publisher.Publish(new TestEvent1DomainEvent()); + await publisher.Publish(new TestEvent1DomainEvent()); TestEvent1DomainEventHandler1.ShouldEventuallyHaveOneTriggeredInstance(); TestEvent1DomainEventHandler2.ShouldNotHaveAnyTriggeredInstance(); diff --git a/Modules/Announcements/src/Announcements.ConsumerApi/AnnouncementsModule.cs b/Modules/Announcements/src/Announcements.ConsumerApi/AnnouncementsModule.cs index d0a46778f2..1b2eb363f2 100644 --- a/Modules/Announcements/src/Announcements.ConsumerApi/AnnouncementsModule.cs +++ b/Modules/Announcements/src/Announcements.ConsumerApi/AnnouncementsModule.cs @@ -35,7 +35,8 @@ public override void ConfigureServices(IServiceCollection services, IConfigurati parsedConfiguration.Infrastructure.SqlDatabase.ConnectionString); } - public override void ConfigureEventBus(IEventBus eventBus) + public override Task ConfigureEventBus(IEventBus eventBus) { + return Task.CompletedTask; } } diff --git a/Modules/Challenges/src/Challenges.ConsumerApi/ChallengesModule.cs b/Modules/Challenges/src/Challenges.ConsumerApi/ChallengesModule.cs index cf8a938181..d2a10964d6 100644 --- a/Modules/Challenges/src/Challenges.ConsumerApi/ChallengesModule.cs +++ b/Modules/Challenges/src/Challenges.ConsumerApi/ChallengesModule.cs @@ -35,8 +35,8 @@ public override void ConfigureServices(IServiceCollection services, IConfigurati parsedConfiguration.Infrastructure.SqlDatabase.ConnectionString); } - public override void ConfigureEventBus(IEventBus eventBus) + public override Task ConfigureEventBus(IEventBus eventBus) { - + return Task.CompletedTask; } } diff --git a/Modules/Devices/src/Devices.Application/Extensions/IEventBusExtensions.cs b/Modules/Devices/src/Devices.Application/Extensions/IEventBusExtensions.cs index 271dbdb2de..38626e03e6 100644 --- a/Modules/Devices/src/Devices.Application/Extensions/IEventBusExtensions.cs +++ b/Modules/Devices/src/Devices.Application/Extensions/IEventBusExtensions.cs @@ -13,27 +13,27 @@ namespace Backbone.Modules.Devices.Application.Extensions; public static class IEventBusExtensions { - public static void AddDevicesDomainEventSubscriptions(this IEventBus eventBus) + public static async Task AddDevicesDomainEventSubscriptions(this IEventBus eventBus) { - eventBus.SubscribeToAnnouncementsEvents(); - eventBus.SubscribeToDevicesEvents(); - eventBus.SubscribeToSynchronizationEvents(); + await eventBus.SubscribeToAnnouncementsEvents(); + await eventBus.SubscribeToDevicesEvents(); + await eventBus.SubscribeToSynchronizationEvents(); } - private static void SubscribeToAnnouncementsEvents(this IEventBus eventBus) + private static async Task SubscribeToAnnouncementsEvents(this IEventBus eventBus) { - eventBus.Subscribe(); + await eventBus.Subscribe(); } - private static void SubscribeToDevicesEvents(this IEventBus eventBus) + private static async Task SubscribeToDevicesEvents(this IEventBus eventBus) { - eventBus.Subscribe(); + await eventBus.Subscribe(); } - private static void SubscribeToSynchronizationEvents(this IEventBus eventBus) + private static async Task SubscribeToSynchronizationEvents(this IEventBus eventBus) { - eventBus.Subscribe(); - eventBus.Subscribe(); - eventBus.Subscribe(); + await eventBus.Subscribe(); + await eventBus.Subscribe(); + await eventBus.Subscribe(); } } diff --git a/Modules/Devices/src/Devices.Application/Tiers/Commands/DeleteTier/Handler.cs b/Modules/Devices/src/Devices.Application/Tiers/Commands/DeleteTier/Handler.cs index 33a1549323..3ee73ea72c 100644 --- a/Modules/Devices/src/Devices.Application/Tiers/Commands/DeleteTier/Handler.cs +++ b/Modules/Devices/src/Devices.Application/Tiers/Commands/DeleteTier/Handler.cs @@ -41,6 +41,6 @@ public async Task Handle(DeleteTierCommand request, CancellationToken cancellati await _tiersRepository.Remove(tier); - _eventBus.Publish(new TierDeletedDomainEvent(tier)); + _ = _eventBus.Publish(new TierDeletedDomainEvent(tier)); } } diff --git a/Modules/Devices/src/Devices.ConsumerApi/DevicesModule.cs b/Modules/Devices/src/Devices.ConsumerApi/DevicesModule.cs index a4a31a573a..a3dacdb5c7 100644 --- a/Modules/Devices/src/Devices.ConsumerApi/DevicesModule.cs +++ b/Modules/Devices/src/Devices.ConsumerApi/DevicesModule.cs @@ -40,9 +40,9 @@ public override void ConfigureServices(IServiceCollection services, IConfigurati parsedConfiguration.Infrastructure.SqlDatabase.ConnectionString); } - public override void ConfigureEventBus(IEventBus eventBus) + public override async Task ConfigureEventBus(IEventBus eventBus) { - eventBus.AddDevicesDomainEventSubscriptions(); + await eventBus.AddDevicesDomainEventSubscriptions(); } public override void PostStartupValidation(IServiceProvider serviceProvider) diff --git a/Modules/Files/src/Files.ConsumerApi/FilesModule.cs b/Modules/Files/src/Files.ConsumerApi/FilesModule.cs index 9b3e0375d6..97796f62ac 100644 --- a/Modules/Files/src/Files.ConsumerApi/FilesModule.cs +++ b/Modules/Files/src/Files.ConsumerApi/FilesModule.cs @@ -37,7 +37,8 @@ public override void ConfigureServices(IServiceCollection services, IConfigurati parsedConfiguration.Infrastructure.SqlDatabase.ConnectionString); } - public override void ConfigureEventBus(IEventBus eventBus) + public override Task ConfigureEventBus(IEventBus eventBus) { + return Task.CompletedTask; } } diff --git a/Modules/Messages/src/Messages.Application/Extensions/IEventBusExtensions.cs b/Modules/Messages/src/Messages.Application/Extensions/IEventBusExtensions.cs index 5dcd2ed112..3905da63cb 100644 --- a/Modules/Messages/src/Messages.Application/Extensions/IEventBusExtensions.cs +++ b/Modules/Messages/src/Messages.Application/Extensions/IEventBusExtensions.cs @@ -8,19 +8,19 @@ namespace Backbone.Modules.Messages.Application.Extensions; public static class IEventBusExtensions { - public static void AddMessagesDomainEventSubscriptions(this IEventBus eventBus) + public static async Task AddMessagesDomainEventSubscriptions(this IEventBus eventBus) { - SubscribeToMessagesEvents(eventBus); - SubscribeToRelationshipsEvents(eventBus); + await SubscribeToMessagesEvents(eventBus); + await SubscribeToRelationshipsEvents(eventBus); } - private static void SubscribeToMessagesEvents(IEventBus eventBus) + private static async Task SubscribeToMessagesEvents(IEventBus eventBus) { - eventBus.Subscribe(); + await eventBus.Subscribe(); } - private static void SubscribeToRelationshipsEvents(IEventBus eventBus) + private static async Task SubscribeToRelationshipsEvents(IEventBus eventBus) { - eventBus.Subscribe(); + await eventBus.Subscribe(); } } diff --git a/Modules/Messages/src/Messages.ConsumerApi/MessagesModule.cs b/Modules/Messages/src/Messages.ConsumerApi/MessagesModule.cs index e8359c2e4a..2cf9c8e8ea 100644 --- a/Modules/Messages/src/Messages.ConsumerApi/MessagesModule.cs +++ b/Modules/Messages/src/Messages.ConsumerApi/MessagesModule.cs @@ -35,8 +35,8 @@ public override void ConfigureServices(IServiceCollection services, IConfigurati parsedConfiguration.Infrastructure.SqlDatabase.ConnectionString); } - public override void ConfigureEventBus(IEventBus eventBus) + public override async Task ConfigureEventBus(IEventBus eventBus) { - eventBus.AddMessagesDomainEventSubscriptions(); + await eventBus.AddMessagesDomainEventSubscriptions(); } } diff --git a/Modules/Quotas/src/Quotas.Application/Extensions/IEventBusExtensions.cs b/Modules/Quotas/src/Quotas.Application/Extensions/IEventBusExtensions.cs index c8075e250d..3af336534a 100644 --- a/Modules/Quotas/src/Quotas.Application/Extensions/IEventBusExtensions.cs +++ b/Modules/Quotas/src/Quotas.Application/Extensions/IEventBusExtensions.cs @@ -25,23 +25,23 @@ namespace Backbone.Modules.Quotas.Application.Extensions; public static class IEventBusExtensions { - public static void AddQuotasDomainEventSubscriptions(this IEventBus eventBus) + public static async Task AddQuotasDomainEventSubscriptions(this IEventBus eventBus) { - SubscribeToSynchronizationEvents(eventBus); + await SubscribeToSynchronizationEvents(eventBus); } - private static void SubscribeToSynchronizationEvents(IEventBus eventBus) + private static async Task SubscribeToSynchronizationEvents(IEventBus eventBus) { - eventBus.Subscribe(); - eventBus.Subscribe(); - eventBus.Subscribe(); - eventBus.Subscribe(); - eventBus.Subscribe(); - eventBus.Subscribe(); - eventBus.Subscribe(); - eventBus.Subscribe(); - eventBus.Subscribe(); - eventBus.Subscribe(); - eventBus.Subscribe(); + await eventBus.Subscribe(); + await eventBus.Subscribe(); + await eventBus.Subscribe(); + await eventBus.Subscribe(); + await eventBus.Subscribe(); + await eventBus.Subscribe(); + await eventBus.Subscribe(); + await eventBus.Subscribe(); + await eventBus.Subscribe(); + await eventBus.Subscribe(); + await eventBus.Subscribe(); } } diff --git a/Modules/Quotas/src/Quotas.ConsumerApi/QuotasModule.cs b/Modules/Quotas/src/Quotas.ConsumerApi/QuotasModule.cs index 01b3073006..c39bfd1174 100644 --- a/Modules/Quotas/src/Quotas.ConsumerApi/QuotasModule.cs +++ b/Modules/Quotas/src/Quotas.ConsumerApi/QuotasModule.cs @@ -37,8 +37,8 @@ public override void ConfigureServices(IServiceCollection services, IConfigurati services.AddResponseCaching(); } - public override void ConfigureEventBus(IEventBus eventBus) + public override async Task ConfigureEventBus(IEventBus eventBus) { - eventBus.AddQuotasDomainEventSubscriptions(); + await eventBus.AddQuotasDomainEventSubscriptions(); } } diff --git a/Modules/Relationships/src/Relationships.Application/Extensions/IEventBusExtensions.cs b/Modules/Relationships/src/Relationships.Application/Extensions/IEventBusExtensions.cs index 1949320c73..4f95b76447 100644 --- a/Modules/Relationships/src/Relationships.Application/Extensions/IEventBusExtensions.cs +++ b/Modules/Relationships/src/Relationships.Application/Extensions/IEventBusExtensions.cs @@ -8,15 +8,15 @@ namespace Backbone.Modules.Relationships.Application.Extensions; public static class IEventBusExtensions { - public static void AddRelationshipsDomainEventSubscriptions(this IEventBus eventBus) + public static async Task AddRelationshipsDomainEventSubscriptions(this IEventBus eventBus) { - SubscribeToIdentitiesEvents(eventBus); + await SubscribeToIdentitiesEvents(eventBus); } - private static void SubscribeToIdentitiesEvents(IEventBus eventBus) + private static async Task SubscribeToIdentitiesEvents(IEventBus eventBus) { - eventBus.Subscribe(); - eventBus.Subscribe(); - eventBus.Subscribe(); + await eventBus.Subscribe(); + await eventBus.Subscribe(); + await eventBus.Subscribe(); } } diff --git a/Modules/Relationships/src/Relationships.ConsumerApi/RelationshipsModule.cs b/Modules/Relationships/src/Relationships.ConsumerApi/RelationshipsModule.cs index 0b5631ed0a..f259727a42 100644 --- a/Modules/Relationships/src/Relationships.ConsumerApi/RelationshipsModule.cs +++ b/Modules/Relationships/src/Relationships.ConsumerApi/RelationshipsModule.cs @@ -35,8 +35,8 @@ public override void ConfigureServices(IServiceCollection services, IConfigurati parsedConfiguration.Infrastructure.SqlDatabase.ConnectionString); } - public override void ConfigureEventBus(IEventBus eventBus) + public override async Task ConfigureEventBus(IEventBus eventBus) { - eventBus.AddRelationshipsDomainEventSubscriptions(); + await eventBus.AddRelationshipsDomainEventSubscriptions(); } } diff --git a/Modules/Synchronization/src/Synchronization.Application/Extensions/IEventBusExtensions.cs b/Modules/Synchronization/src/Synchronization.Application/Extensions/IEventBusExtensions.cs index 0ce6264bca..b9da878d52 100644 --- a/Modules/Synchronization/src/Synchronization.Application/Extensions/IEventBusExtensions.cs +++ b/Modules/Synchronization/src/Synchronization.Application/Extensions/IEventBusExtensions.cs @@ -22,26 +22,26 @@ namespace Backbone.Modules.Synchronization.Application.Extensions; public static class IEventBusExtensions { - public static void AddSynchronizationDomainEventSubscriptions(this IEventBus eventBus) + public static async Task AddSynchronizationDomainEventSubscriptions(this IEventBus eventBus) { - SubscribeToMessagesEvents(eventBus); - SubscribeToRelationshipsEvents(eventBus); + await SubscribeToMessagesEvents(eventBus); + await SubscribeToRelationshipsEvents(eventBus); } - private static void SubscribeToMessagesEvents(IEventBus eventBus) + private static async Task SubscribeToMessagesEvents(IEventBus eventBus) { - eventBus.Subscribe(); - eventBus.Subscribe(); - eventBus.Subscribe(); + await eventBus.Subscribe(); + await eventBus.Subscribe(); + await eventBus.Subscribe(); } - private static void SubscribeToRelationshipsEvents(IEventBus eventBus) + private static async Task SubscribeToRelationshipsEvents(IEventBus eventBus) { - eventBus.Subscribe(); - eventBus.Subscribe(); - eventBus.Subscribe(); - eventBus.Subscribe(); - eventBus.Subscribe(); - eventBus.Subscribe(); + await eventBus.Subscribe(); + await eventBus.Subscribe(); + await eventBus.Subscribe(); + await eventBus.Subscribe(); + await eventBus.Subscribe(); + await eventBus.Subscribe(); } } diff --git a/Modules/Synchronization/src/Synchronization.ConsumerApi/SynchronizationModule.cs b/Modules/Synchronization/src/Synchronization.ConsumerApi/SynchronizationModule.cs index 4c73a84e25..96f20fdc59 100644 --- a/Modules/Synchronization/src/Synchronization.ConsumerApi/SynchronizationModule.cs +++ b/Modules/Synchronization/src/Synchronization.ConsumerApi/SynchronizationModule.cs @@ -35,8 +35,8 @@ public override void ConfigureServices(IServiceCollection services, IConfigurati parsedConfiguration.Infrastructure.SqlDatabase.ConnectionString); } - public override void ConfigureEventBus(IEventBus eventBus) + public override async Task ConfigureEventBus(IEventBus eventBus) { - eventBus.AddSynchronizationDomainEventSubscriptions(); + await eventBus.AddSynchronizationDomainEventSubscriptions(); } } diff --git a/Modules/Tags/src/Tags.ConsumerApi/TagsModule.cs b/Modules/Tags/src/Tags.ConsumerApi/TagsModule.cs index 9dacc76eb4..dfd60f1e93 100644 --- a/Modules/Tags/src/Tags.ConsumerApi/TagsModule.cs +++ b/Modules/Tags/src/Tags.ConsumerApi/TagsModule.cs @@ -20,8 +20,9 @@ public override void ConfigureServices(IServiceCollection services, IConfigurati services.AddPersistence(); } - public override void ConfigureEventBus(IEventBus eventBus) + public override Task ConfigureEventBus(IEventBus eventBus) { // No Event bus needed here + return Task.CompletedTask; } } diff --git a/Modules/Tokens/src/Tokens.ConsumerApi/TokensModule.cs b/Modules/Tokens/src/Tokens.ConsumerApi/TokensModule.cs index aac517056f..a61507d541 100644 --- a/Modules/Tokens/src/Tokens.ConsumerApi/TokensModule.cs +++ b/Modules/Tokens/src/Tokens.ConsumerApi/TokensModule.cs @@ -35,7 +35,8 @@ public override void ConfigureServices(IServiceCollection services, IConfigurati parsedConfiguration.Infrastructure.SqlDatabase.ConnectionString); } - public override void ConfigureEventBus(IEventBus eventBus) + public override Task ConfigureEventBus(IEventBus eventBus) { + return Task.CompletedTask; } }