Azure Functions Bus is a simple framework that creates a message bus on top of the Azure Functions infrastructure. That way you can create a distributed system using a serverless technology on top of the Azure Storage.
- You can gather different functions into one host.
- You can have just one queue per service.
- You can have sagas.
- Grab the nuget package for AFBus.
- Define the connection string in the host.json
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true"
}
or in the appsettings.json or local.settings.json if you want to modify the tests
{
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"AzureWebJobsDashboard": "UseDevelopmentStorage=true",
"LockSagas": "True"
}
The system gets divided in different parts:
- Sagas (Folder)
- SagaA.Host (Project)
- Sagas
- SagaA.Messages (Project)
- SagaA.Tests
- SagaA.Host (Project)
- Services(Folder)
- ServiceA.Host (Project)
- Handlers
- ServiceA.Messages (Project)
- ServiceA.Tests
- ServiceA.Host (Project)
Just POCO classes that are shared between different services to communicate to each other.
Project with the functions that are the entrance of the service. It must create the handler container.
//here the dlls are scanned looking for handlers
private static HandlersContainer container = new HandlersContainer();
This container looks for every IHandle<> implementation. Each message gets passed to its handlers when the handle method of the container is called.
public static class ShippingService
{
static HandlersContainer container = new HandlersContainer();
[FunctionName("ShippingServiceEndpointFunction")]
public static async Task Run([QueueTrigger("shippingservice", Connection = "")]string myQueueItem, ILogger log)
{
//Calls to every handler that receives that message
await container.HandleAsync(myQueueItem, new Logger(log));
}
}
Defining a stateless handler is just implementing the IHandle interface in a class. For instance:
public class ShipOrderHandler : IHandle<ShipOrder>
{
IShippingRepository rep;
public ShipOrderHandler(IShippingRepository rep)
{
this.rep = rep;
}
public async Task HandleAsync(IBus bus, ShipOrder message, ILogger Log)
{
Log.Info("order shipped");
rep.AddOrderShipped(new OrderShipped { User = message.UserName });
await bus.SendAsync(new ShipOrderResponse() { UserName = message.UserName }, message.ReplyTo);
}
}
Sagas are stateful components that orchestrates differents messages. In a saga you must define three parts:
- SagaData: the data that will be stored between messages.
- IHandleStartingSaga: handlers that creates sagas (the first message received by the saga).
- IHandleWithCorrelation: handlers that correlates in the saga. You need to implement two methods, the one with the logic and the one with the correlation logic.
public class SimpleTestSaga : Saga<SimpleTestSagaData>, IHandleStartingSaga<SimpleSagaStartingMessage>, IHandleWithCorrelation<SimpleSagaIntermediateMessage>, IHandleWithCorrelation<SimpleSagaTerminatingMessage>
{
private const string PARTITION_KEY = "SimpleTestSaga";
public Task HandleAsync(IBus bus, SimpleSagaStartingMessage input, ILogger Log)
{
this.Data.PartitionKey = PARTITION_KEY;
this.Data.RowKey = input.Id.ToString();
this.Data.Counter++;
return Task.CompletedTask;
}
public Task HandleAsync(IBus bus, SimpleSagaIntermediateMessage input, ILogger Log)
{
this.Data.Counter++;
return Task.CompletedTask;
}
public async Task HandleAsync(IBus bus, SimpleSagaTerminatingMessage message, ILogger Log)
{
await this.DeleteSaga();
}
public async Task<SagaData> LookForInstance(SimpleSagaIntermediateMessage message)
{
var sagaData = await sagaPersistence.GetSagaData<SimpleTestSagaData>(PARTITION_KEY, message.Id.ToString());
return sagaData;
}
public async Task<SagaData> LookForInstance(SimpleSagaTerminatingMessage message)
{
var sagaData = await sagaPersistence.GetSagaData<SimpleTestSagaData>(PARTITION_KEY, message.Id.ToString());
return sagaData;
}
}
Messages outside the AFBus framework can be launched using the SendOnlyBus class.
SendOnlyBus.SendAsync(message, SERVICENAME).Wait();
Messages inside the framework can be launched using the bus object parameter in the handler method.
bus.SendAsync(new PayOrderResponse() { UserName = "pablo"}, message.ReplyTo);
Dependency injection can be set using special methods for it
Here a dependency is set in the static constructor
public static class ShippingService
{
static HandlersContainer container = new HandlersContainer();
static ShippingService()
{
HandlersContainer.AddDependency<IShippingRepository, InMemoryShippingRepository>();
}
[FunctionName("ShippingServiceEndpointFunction")]
public static async Task Run([QueueTrigger("shippingservice")]string myQueueItem, ILogger log)
{
log.Info($"C# Queue trigger function processed: {myQueueItem}");
await container.HandleAsync(myQueueItem, log);
}
}
Here the dependency is injected into the constructor
public class ShipOrderHandler : IHandle<ShipOrder>
{
IShippingRepository rep;
public ShipOrderHandler(IShippingRepository rep)
{
this.rep = rep;
}
public async Task HandleAsync(IBus bus, ShipOrder message, ILogger Log)
{
Log.Info("order shipped");
rep.AddOrderShipped(new OrderShipped { User = message.UserName });
await bus.SendAsync(new ShipOrderResponse() { UserName = message.UserName }, message.ReplyTo);
}
}
Look into AFBusService.cs in the ASP.NET Core example. There you can see how the UI receives commands in its own queue and sends a SignalR message to the UI.
If messages are bigger than 65k the body of it is saved in a blob storage that will be read in the reception.