- General usage of publish and subscribe
- Publish and subscribe using RabbitMQ topic exchange wildcards
- Integrating with the
SubscriptionManager
- Logging with
BunnyBus
Logging events
Configure and register handlers to a couple queues. When a subscribed event is published to one of those queues, it will be processed by the handler.
const config = require('ez-config');
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus(config.get('rabbit'));
const logger = require('pino');
bunnyBus.logger = logger;
const handlersForQueue1 = {
'message_created' : async (message, ack, reject, requeue) => {
//write to elasticsearch
await ack();
}
};
const handlersForQueue2 = {
'message_deleted' : async (message, ack, reject, requeue) => {
//delete from elastic search
await ack();
}
}
try {
await Promise.all([
subscribe('queue1', handlersForQueue1),
subscribe('queue2', handlersForQueue2)
]);
}
catch (err) {
logger.error('railed to subscribe', err);
}
With the above handlers registered, let's publish some events to the bus.
const config = require('ez-config');
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus(config.get('rabbit'));
const logger = require('pino');
bunnyBus.logger = logger;
try {
await Promise.all([
bunnyBus.publish({
id : 23,
event : 'message-created',
body : 'hello world'
}),
bunnyBus.publish({
id : 23,
event : 'message-deleted'
})
]);
}
catch (err) {
logger.error('failed to publish', err);
}
Configuration and registration of handlers to a couple queues that listens to some events and handles them.
const config = require('ez-config');
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus(config.get('rabbit'));
const logger = require('pino');
bunnyBus.logger = logger;
const handlers = {
'email.*' : async (message, ack, reject, requeue) => {
//do work
await ack();
},
'voicemail.#' : async (message, ack, reject, requeue) => {
//do work
await ack();
}
};
try {
await subscribe('communictionQueue', handlers);
}
catch (err) {
logger.error('failed to subscribe', err);
}
With the above handlers registered, let's publish some events to the bus.
const config = require('ez-config');
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus(config.get('rabbit'));
const logger = require('pino');
bunnyBus.logger = logger;
try {
// this will make it to the queue and subscribed handler
await bunnyBus.publish({
id : 1001,
event : 'email.created',
body : 'hello world'
});
// this will not make it to the queue because the wildcard
// assigned by the subscribing handler will not catch this route
await bunnyBus.publish({
id : 1002,
event : 'email.created.highpriority',
body : 'hello world on fire'
});
// this will not make it to the queue because the wildcard
// assigned by the subscribing handler will not catch this route
await bunnyBus.publish({
id : 9001,
event : 'voicemail.crated.lowpriority'
body : 'translation was world hello'
});
}
catch (err) {
logger.error('failed to publish', err);
}
The use of SubscriptionManager
is completely optional. When debugging needs to occur in a runtime environment or during a deployment, it can be helpful to temporarily stop events from being consumed. The SubscriptionManager
provides an entrypoint to stop events from being consumed without stopping the overall process. Along the same lines, the consumption of events can be restarted with the SubscriptionManager
. For example, you could create an HTTP endpoint to pause a queue so that events aren't processed. In the handler for this hypothetical endpoint, you would invoke SubscriptionManager
to block the target queue that the BunnyBus
instance is subscribed to.
const app = require('express')();
const config = require('ez-config');
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus(config.get('rabbit'));
app.get('/stopSubscription/:queue', function(req, res) => {
bunnyBus.subscriptions.block(req.params.queue);
res.send('success');
});
And to unblock the queues
app.get('/restartSubscription/:queue', function(req, res) => {
bunnyBus.subscriptions.unblock(req.params.queue);
res.send('success');
});
const app = require('express')();
const config = require('ez-config');
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus(config.get('rabbit'));
app.get('/stopSubscription/:queue', function(req, res) => {
bunnyBus.once(BunnyBus.SUBSCRIBED_EVENT, (queue) => {
if (queue === req.params.queue) {
res.send('success');
}
});
bunnyBus.subscriptions.block(req.params.queue);
});
And to unblock the queues
app.get('/restartSubscription/:queue', function(req, res) => {
bunnyBus.once(BunnyBus.UNSUBSCRIBED_EVENT, (queue) => {
if (queue === req.params.queue) {
res.send('success');
}
});
bunnyBus.subscriptions.unblock(req.params.queue);
});
BunnyBus
allows for logging to be used in conjunction with events. While the logger
property is an easy way to replace the default logging mechanism with another like pino
or winston
, it could be overkill at times when you just want to simply log one log level to the console.
const app = require('express')();
const BunnyBus = require('bunnybus');
const bunnyBus = new BunnyBus();
bunnyBus.on(BunnyBus.LOG_INFO_EVENT, (message) => [
console.log(message);
]);