Skip to content

Projecting Events

Charles Solar edited this page Jun 13, 2018 · 2 revisions

The events raised by entities are received on clients to update databases. They are delivered via the NServiceBus pipeline so your event handlers will look the exact same as if you were publishing your events to a transport like RabbitMq.

Here in Handler.cs we are projecting buyer-related events into an elastic index.

public class Handler : 
    IHandleQueries<Queries.Buyers>,
    IHandleMessages<Events.Initiated>,
    IHandleMessages<Events.InGoodStanding>,
    IHandleMessages<Events.PreferredAddressSet>,
    IHandleMessages<Events.PreferredPaymentSet>,
    IHandleMessages<Events.Suspended>,
    IHandleMessages<Order.Events.Drafted>,
    IHandleMessages<Order.Events.Paid>,
    IHandleMessages<Order.Events.Canceled>
{
    public async Task Handle(Queries.Buyers query, IMessageHandlerContext ctx)
    {
        ...
    }
    public Task Handle(Events.Initiated e, IMessageHandlerContext ctx)
    {
        var model = new Models.OrderingBuyerIndex
        {
            Id = e.UserName,
            GivenName = e.GivenName,
            GoodStanding = true
        };

        return ctx.App<Infrastructure.IUnitOfWork>().Add(e.UserName, model);
    }

    public async Task Handle(Events.InGoodStanding e, IMessageHandlerContext ctx)
    {
        var buyer = await ctx.App<Infrastructure.IUnitOfWork>().Get<Models.OrderingBuyerIndex>(e.UserName).ConfigureAwait(false);
        buyer.GoodStanding = true;
        await ctx.App<Infrastructure.IUnitOfWork>().Update(e.UserName, buyer).ConfigureAwait(false);
    }
    public async Task Handle(Events.PreferredAddressSet e, IMessageHandlerContext ctx)
    {
        var buyer = await ctx.App<Infrastructure.IUnitOfWork>().Get<Models.OrderingBuyerIndex>(e.UserName).ConfigureAwait(false);
        var address = await ctx.App<Infrastructure.IUnitOfWork>().Get<Entities.Address.Models.Address>(e.AddressId)
            .ConfigureAwait(false);

        buyer.PreferredCity = address.City;
        buyer.PreferredState = address.State;
        buyer.PreferredZipCode = address.ZipCode;
        buyer.PreferredCountry = address.Country;

        await ctx.App<Infrastructure.IUnitOfWork>().Update(e.UserName, buyer).ConfigureAwait(false);
    }
    public async Task Handle(Events.PreferredPaymentSet e, IMessageHandlerContext ctx)
    {
        var buyer = await ctx.App<Infrastructure.IUnitOfWork>().Get<Models.OrderingBuyerIndex>(e.UserName).ConfigureAwait(false);
        var method = await ctx.App<Infrastructure.IUnitOfWork>()
            .Get<Entities.PaymentMethod.Models.PaymentMethod>(e.PaymentMethodId).ConfigureAwait(false);

        buyer.PreferredPaymentCardholder = method.CardholderName;
        buyer.PreferredPaymentMethod = method.CardType;
        buyer.PreferredPaymentExpiration = method.Expiration.ToString("MM/yy");
        
        await ctx.App<Infrastructure.IUnitOfWork>().Update(e.UserName, buyer).ConfigureAwait(false);
    }
    
}

Notice how we define our subscription to an event by simply IHandleMessages<Events.Initiated>. Aggregates.NET will scan for these and automatically create a projection in EventStore to receive these events as part of a subscriber group. We'll receive the events here and as you can see we utilize a generic unit of work to create and update an object based on the events.

We can even load other entities to project their data into our model like so:

public async Task Handle(Events.PreferredAddressSet e, IMessageHandlerContext ctx)
{
    var buyer = await ctx.App<Infrastructure.IUnitOfWork>().Get<Models.OrderingBuyerIndex>(e.UserName).ConfigureAwait(false);
    var address = await ctx.App<Infrastructure.IUnitOfWork>().Get<Entities.Address.Models.Address>(e.AddressId)
        .ConfigureAwait(false);

    buyer.PreferredCity = address.City;
    buyer.PreferredState = address.State;
    buyer.PreferredZipCode = address.ZipCode;
    buyer.PreferredCountry = address.Country;

    await ctx.App<Infrastructure.IUnitOfWork>().Update(e.UserName, buyer).ConfigureAwait(false);
}

The ability to load Address above is dependent on the fact that the model exists in elastic already of course.

Clone this wiki locally