This library is wrapper around Azure Service Bus. The main goals are:
- to send a command to a queue or publish an event to a particular topic,
- to listen to an Azure Service Bus queue or a topic as a subscription,
- to parse the message to a predefined strongly typed object when arrives,
- to pass the parsed object to a predefined handler.
Amido.Stacks.Configuration
.Amido.Stacks.Application.CQRS.Abstractions
.Amido.Stacks.DependencyInjection
.Microsoft.Azure.ServiceBus
.Microsoft.Extensions.Hosting
.
The library currently supports:
- Sending and receiving commands implementing
Amido.Stacks.Application.CQRS.Commands.ICommands
. - Publishing and receiving events implementing
Amido.Stacks.Application.CQRS.ApplicationEvents.IApplicationEvent
.
As an example we are having a NotifyCommandHandler
as a handler for NotifyCommand
. The handler implements
Amido.Stacks.Application.CQRS.Commands.ICommandHandler<NotifyCommand, bool>
and the command implements
Amido.Stacks.Application.CQRS.Commands.ICommand
interfaces.
NotifyCommand.cs
public class NotifyCommand : ICommand
{
public NotifyCommand(Guid correlationId, string testMember)
{
OperationCode = 666;
CorrelationId = correlationId;
TestMember = testMember;
}
public string TestMember { get; }
public int OperationCode { get; }
public Guid CorrelationId { get; }
}
NotifyCommandHandler.cs
public class NotifyCommandHandler : ICommandHandler<NotifyCommand, bool>
{
private readonly ITestable<NotifyCommand> _testable;
public NotifyCommandHandler(ITestable<NotifyCommand> testable)
{
_testable = testable;
}
public Task<bool> HandleAsync(NotifyCommand command)
{
_testable.Complete(command);
return Task.FromResult(true);
}
}
The command dispatchers responsibility is to send a command message to a preconfigured queue. The FullName - such as
Amido.Stacks.Messaging.Commands.NotifyCommand
- of the type (command) is paired
with the queue-name in the Routing configuration. Each individual queue will have one message sender, therefore the
queue name in the routing - e.g notifications-command
- has to match for the name in the routing configuration.
The configuration for the CommandDispatcher is in the ServiceBusSender section.
Queues | Queue Routes | Behaviour |
---|---|---|
1 queue is defined | no routing is defined | sends all messages* |
1 queue is defined | 1 routing is defined for one type | sends only mapped messages** |
2 or more queues defined | no routing is defined | all commands will fail*** |
2 or more queues defined | 1 or more routing is defined | routed messages will be sent |
*defaults all the messages to one queue
**it works as a filter, routed messages are sent, the non routed ones are throwing MessageRouteNotDefined exception
***routing configuration is needed when more than one queue is defined.
appsettings.json
{
"ServiceBusConfiguration": {
"Sender": {
"Queues": [
{
"Name": "notifications-command",
"ConnectionStringSecret": {
"Identifier": "SERVICEBUS_CONNECTIONSTRING",
"Source": "Environment"
}
}
],
"Routes": {
"QueueRoutes": [
{
"Name": "notifications-command",
"Types": [
"Amido.Stacks.Messaging.Commands.NotifyCommand"
]
}
]
}
}
}
}
startup.cs
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddTransient<Consumer>();
services.Configure<ServiceBusSenderConfiguration>(configurationRoot.GetSection("ServiceBusSender"))
services.AddTransient<ICommandHandler<NotifyCommand, bool>, NotifyCommandHandler>();
.AddServiceBus();
}
}
public class Consumer
{
private readonly ICommandDispatcher _dispatcher;
public Consumer(ICommandDispatcher dispatcher) {
_dispatcher = dispatcher;
}
public async Task SendIt(Data dataToSend)
{
// Example usage for an example command
await _dispatcher.SendAsync(new NotifyClientCommand(... , dataToSend, ...));
}
}
The listener can listen to many queues. The Name describes the name of the queue, ConcurrencyLevel is the MaxConcurrentCalls, DisableProcessing is the flag to enable/disable the registration - listening to Service Bus - and DisableMessageValidation flag disables/enables the validation on the incoming messages. The configuration for the Listener is under ServiceBusListener section.
appsettings.json
{
"ServiceBusConfiguration": {
"Listener": {
"Topics": [
{
"Name": "notifications",
"ConcurrencyLevel": 5,
"DisableProcessing": false,
"ConnectionStringSecret": {
"Identifier": "SERVICEBUS_CONNECTIONSTRING",
"Source": "Environment"
},
"DisableMessageValidation": true
}
]
}
}
}
Usage
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureAppConfiguration(builder =>
{
// Add the configuration file
builder.SetBasePath(Directory.GetCurrentDirectory())
.AddJsonFile("appsettings.json", optional: true);
})
.ConfigureServices((hostContext, services) =>
{
services
.AddLogging()
.AddSecrets()
.Configure<ServiceBusConfiguration>(hostContext.Configuration.GetSection("ServiceBusConfiguration"))
.AddTransient<ICommandHandler<NotifyClientCommand, bool>, NotifyClientCommandHandler>();
.AddServiceBus()
});
}
In this case the NotifyEvent
has a NotifyEventHandler
. The handler implements
Amido.Stacks.Application.CQRS.ApplicationEvents.IApplicationEventHandler<NotifyCommand, bool>
and the command implements
Amido.Stacks.Application.CQRS.ApplicationEvents.IApplicationEvent
interfaces.
NotifyEvent.cs
public class NotifyEvent : IApplicationEvent
{
public int OperationCode { get; }
public Guid CorrelationId { get; }
public int EventCode { get; }
public NotifyEvent(int operationCode, Guid correlationId, int eventCode)
{
OperationCode = operationCode;
CorrelationId = correlationId;
EventCode = eventCode;
}
}
NotifyEventHandler.cs
public class NotifyEventHandler : IApplicationEventHandler<NotifyEvent>
{
private readonly ITestable<NotifyEvent> _testable;
public NotifyEventHandler(ITestable<NotifyEvent> testable)
{
_testable = testable;
}
public Task HandleAsync(NotifyEvent applicationEvent)
{
_testable.Complete(applicationEvent);
return Task.CompletedTask;
}
}
Its responsibility is to publish an event message to a preconfigured topic. The topic for the event depends on the Routing configuration. The following routing table will picture the different configurations:
Topics | Topic Routes | Behaviour |
---|---|---|
1 topic is defined | no routing is defined | publishes all messages* |
1 topic is defined | 1 routing is defined for one type | publishes only mapped messages** |
2 or more topics defined | no routing is defined | all events will fail*** |
2 or more topics defined | 1 or more routing is defined | routed messages will be published |
*defaults all the messages to one topic
**it works as a filter, routed messages are published, the non routed ones are throwing MessageRouteNotDefined exception
***routing configuration is needed when more than one topic is defined.
appsettings.json
{
"ServiceBusConfiguration": {
"ServiceBusSender": {
"Topics": [
{
"Name": "notification-event",
"ConnectionStringSecret": {
"Identifier": "SERVICEBUS_CONNECTIONSTRING",
"Source": "Environment"
}
}
],
"Routes": {
"TopicRoutes": [
{
"Name": "notifications-event",
"Types": [
"Amido.Stacks.Messaging.Commands.NotifyEvent"
]
}
]
}
}
}
}
Startup.cs
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddTransient<Consumer>()
.Configure<ServiceBusSenderConfiguration>(configurationRoot.GetSection("ServiceBusSender"))
.AddServiceBus();
}
}
public class Consumer
{
private readonly IApplicationEventPublisher _eventPublisher;
public Consumer(IApplicationEventPublisher eventPublisher) {
_eventPublisher = eventPublisher;
}
public async Task PublishIt(Data dataToSend)
{
// Example usage for an example command
await _eventPublisher.PublishAsync(new NotifyEvent(... , dataToSend, ...));
}
}
The listener can listen to many topics. The Name describes the name of the topic, ConcurrencyLevel is the MaxConcurrentCalls, DisableProcessing is the flag to enable/disable the registration - listening to Service Bus - and DisableMessageValidation flag disables/enables the validation of the incoming messages. The configuration for the event listener is under ServiceBusListener section.
appsettings.json
{
"ServiceBusConfiguration": {
"Listener": {
"Topics": [
{
"Name": "notifications",
"SubscriptionName": "notification-subscription",
"ConcurrencyLevel": 5,
"DisableProcessing": false,
"ConnectionStringSecret": {
"Identifier": "SERVICEBUS_CONNECTIONSTRING",
"Source": "Environment"
},
"DisableMessageValidation": true
}
]
}
}
}
Program.cs
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureAppConfiguration(builder =>
{
// Add the configuration file
builder.SetBasePath(Directory.GetCurrentDirectory())
.AddJsonFile("appsettings.json", optional: true);
})
.ConfigureServices((hostContext, services) =>
{
services
.AddLogging()
.AddSecrets()
.Configure<ServiceBusConfiguration>(hostContext.Configuration.GetSection("ServiceBusConfiguration"))
.AddServiceBus()
.AddTransient<IApplicationEventHandler<NotifyEvent>, NotifyEventHandler>();
});
}
The unrecoverable exceptions are the exceptions when the parsing of the object fails due to the invalid state of the message. We don't want to retry the process of these messages as it would result the same exception, therefore they are moved to the dead-letter queue with the specified reason, why it has been placed to the dead-letter queue.
Unrecoverable exceptions are:
Amido.Stacks.Messaging.Azure.ServiceBus.Exceptions.UnrecoverableException
general exception.Amido.Stacks.Messaging.Azure.ServiceBus.Exceptions.MessageParsingException
parsing related exception.