Skip to content

Transport message forwarding

Mogens Heller Grabe edited this page Sep 14, 2020 · 3 revisions

If the purpose of your Rebus endpoint is to function as a router, i.e. an endpoint whose only purpose in life is to receive messages to immediately forward them to another queue, then Rebus provides a nice API that enables this.

It's also a good way to create a generic message handler for messages whose corresponding C# types might not be known upfront, e.g. by looking at the headers of the incoming transport message, deserializing it into a JObject (assuming that it's JSON), etc.

You configure it by using the AddTransportMessageForwarder in the .Routing(..) configurer, e.g. going

Configure.With(...)
    .(...)
    .Routing(r => r.AddTransportMessageForwarder(async transportMessage => {
        var headers = transportMessage.Headers; //< Dictionary<string, string>
        var body = transportMessage.Body;       //< byte[]

        // return action here:
        return ForwardAction.ForwardTo("another-queue");
    }))
    .Start();

which in this case will forward all received messages to the another-queue queue.

This API can easily be used to e.g. implement a simple load-balance algorithm, distributing messages evenly among a number of worker queues:

var counter = 0L;
var queues = new[] { "queue-a", "queue-b", "queue-c" };

string GetQueueName() => queues[Interlocked.Increment(ref counter) % queues.Length];

// (...)....

Configure.With(activator)
    .Transport(t => t.UseInMemoryTransport(new InMemNetwork(), "router-tjek"))
    .Routing(r => r.AddTransportMessageForwarder(async transportMessage =>
    {
        var headers = transportMessage.Headers; //< Dictionary<string, string>
        var body = transportMessage.Body;       //< byte[]

        // return action here:
        return ForwardAction.ForwardTo(GetQueueName());
    }))
    .Start();

By returning different ForwardActions, the forwarder can decide what to do about the transport message – e.g. forward to multiple queues:

r.AddTransportMessageForwarder(async transportMessage =>
{
    var destinations = new[] {"one-queue", "another-queue"};
    
    return ForwardAction.ForwardTo(destinations);
});

or be handled normally:

r.AddTransportMessageForwarder(async transportMessage => ForwardAction.None);

or simply discard the message completely:

r.AddTransportMessageForwarder(async transportMessage => ForwardAction.Ignore());
Clone this wiki locally