Skip to content

Commit

Permalink
Event Bus Scoping (#91)
Browse files Browse the repository at this point in the history
* initial testing

* stashing changes

* testing

* pass in already parsed obj
  • Loading branch information
jonwinton authored Oct 2, 2018
1 parent b1f3139 commit c4d9514
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 2 deletions.
15 changes: 13 additions & 2 deletions lib/bus.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
const bluebird = require('bluebird'),
redis = require('redis'),
streams = require('./streams'),
{ BUS_NAMESPACE, BUS_TOPICS } = require('./constants');
{ BUS_NAMESPACE, BUS_TOPICS } = require('./constants'),
PID = process.pid,
HOSTNAME = require('os').hostname();
var client,
log = require('./services/log').setup({file: __filename});

Expand All @@ -24,7 +26,16 @@ function connect() {
*/
function disperseEvent(topic, payload) {
try {
streams[topic].write(JSON.parse(payload));
let data = JSON.parse(payload),
scopedWrite = data.pid && data.hostname;

if (scopedWrite) { // We're dealing with amphora-redis-event-bus calls
if (data.pid === PID && HOSTNAME === data.hostname) {
streams[topic].write(data.msg);
}
} else {
streams[topic].write(data);
}
} catch (e) {
log('error', `Unable to send event ${topic} to bus: ${e.message}`, { payload });
}
Expand Down
26 changes: 26 additions & 0 deletions lib/bus.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,5 +72,31 @@ describe(filename, () => {
expect(logMock).toHaveBeenCalled();
expect(streams[batchEvent].write).not.toHaveBeenCalled();
});

test('works with the payload of amphora-event-bus-redis', () => {
const msg = { foo: 'bar' },
eventBusData = {
hostname: require('os').hostname(),
pid: process.pid,
msg
};

streams[objEvent].write = jest.fn();
lib.disperseEvent(objEvent, JSON.stringify(eventBusData));
expect(streams[objEvent].write).toHaveBeenCalledWith(msg);
});

test('does not works with the payload of amphora-event-bus-redis if pid or host do not match', () => {
const msg = { foo: 'bar' },
eventBusData = {
hostname: require('os').hostname() + 1,
pid: process.pid + 1,
msg
};

streams[objEvent].write = jest.fn();
lib.disperseEvent(objEvent, JSON.stringify(eventBusData));
expect(streams[objEvent].write).not.toHaveBeenCalledWith(msg);
});
});
});

0 comments on commit c4d9514

Please sign in to comment.