diff --git a/README.md b/README.md index dd56c02..e49e7b6 100644 --- a/README.md +++ b/README.md @@ -11,39 +11,90 @@ An implementation of the mediator pattern for use with RainCatcher modules. | `mediator#once( channel, callback )` | A one-time subscribtion to events in a channel | | `mediator#promise( channel )` | A promise-based API for `mediator#once` | + +### Subscription Callbacks + +When passing a `callback` to a `mediator.subscribe` function, it is necessary to return a `Promise` if the operation is asynchronous or if the response from the subscriber is required. + +```javascript +var mediator = require('fh-wfm-mediator'); +var Promise = require('bluebird'); + +mediator.subscribe("wfm:topic", function(topicData) { + + return new Promise(function(resolve, reject) { + doSomeAyncFunction(topicData, function(err, result){ + err ? reject(err) : resolve(result); + }); + }); + +}); + +//The published topic will not resolve until all of the asynchronous subscribers have resolved / rejected +//The `result` is the resolved value of the highest priority subscriber. +mediator.publish("wfm:topic").then(function(result) { + console.log("All of the subscribers have resolved.", result); +}).catch(function(err) { + console.error("An error occurred when executing topic wfm:topic", err); +}); + +``` + ### `Topics` utilities This module also provides a fluent, promise-based API for subscribing to convention and adhering to the request-response pattern used throughout the RainCatcher modules and available through `mediator#request`. -Namely if a `data:read` topic that is used to provide a feature such as reading data from a remote source asyncronously, the result of the operation is by convention published in the `done:data:read` topic, and if it results in an error, it is published to the `error:data:read` topic. - -This utility module helps with enforcing the same namespace for a set of related topics without repeating string literals or constants, and adhering to the convention above. It is available under [`lib/topics`](./lib/topics/index.js) with jsdoc comments. #### Example ```javascript var mediator = require('fh-wfm-mediator'); -var Topics = require('fh-wfm-mediator/lib/topics'); +var Topic = require('fh-wfm-mediator/lib/topics'); + +//A set of topics for saving user data +var userDataTopics = new Topic(mediator) + .prefix('wfm:data') + .entity('user') + .on('read', function(id) { + //asyncReadUser returns a Promise + return asyncReadUser(id); + }).on('update', function(userToUpdate) { + //asyncReadUser returns a Promise + return asyncUpdateUser(userToUpdate); + }); + -var topics = new Topics(mediator) +new Topic(mediator) .prefix('wfm') .entity('user') // This will subscribe to wfm:user:read - // and publish results to done:wfm:user:read:{id} - // and errors to error:wfm:user:read:{id} .on('read', function(id) { - // will request to 'data:user:read' - return this.mediator.request(['data', this.entity, 'read'].join(':'), id); + // will publish to 'wfm:user:data:read', which returns a Promise. + return userDataTopics.publish('read', id).then(function(user) { + //We have retrieved the user, we can apply any additional asynchronous operations we need when the resolving the user + return readUserGroupInformation(user.id).then(function(groupInformation) { + user.groupName = groupInformation.name; + return user; + }); + }); }) - // If you do not return a Promise from the handler function, - // you must manually publish the result to another topic so it can be consumed - .on('delete', function(id) { - var self = this; - this.mediator.request(this.entity + ':delete', id).then(function() { - self.mediator.publish('done:ui:user:deleted:' + id); - }).catch(function(e) { - self.mediator.publish('error:ui:user:deleted:' + id, e); + .on('update_location', function(id, location) { + //If we don't want to wait for the subscribers to resolve, just return null. + userDataTopics.publish('read', id).then(function(user) { + //We have retrieved the user, we can apply any additional asynchronous operations we need when the resolving the user + user.location = location; + userDataTopics.publish('update', user); }); + + return null; }); + + +mediator.publish('wfm:user:read', "userid1234").then(function(user) { + //All of the subscribers have resolved. + console.log("User read with id " + user.id + " and group name " + user.groupName); +}).catch(function(err) { + console.log("Error reading user information", err); +}); ``` ## Usage in an Angular.js client diff --git a/lib/constants.js b/lib/constants.js new file mode 100644 index 0000000..a74a867 --- /dev/null +++ b/lib/constants.js @@ -0,0 +1,5 @@ +module.exports = { + DONE_TOPIC_PREFIX: "done", + ERROR_TOPIC_PREFIX: "error", + TOPIC_SEPERATOR: ":" +}; \ No newline at end of file diff --git a/lib/mediator-spec.js b/lib/mediator-spec.js deleted file mode 100644 index 6b653f0..0000000 --- a/lib/mediator-spec.js +++ /dev/null @@ -1,139 +0,0 @@ -var mediator = require('../lib/mediator'); -var Promise = require('bluebird'); -var assert = require('chai').assert; - -var sinon = require('sinon'); -describe('mediator', function() { - describe('#subscribe',function() { - const TEST_CHANNEL = "test_channel"; - it('Should call callback',function() { - var subscribeCallback = sinon.spy(); - mediator.subscribe(TEST_CHANNEL,subscribeCallback); - mediator.publish(TEST_CHANNEL,"my_data"); - sinon.assert.calledOnce(subscribeCallback); - mediator.publish(TEST_CHANNEL,"another"); - sinon.assert.calledTwice(subscribeCallback); - }); - it('Should accept args',function() { - var subscribeCallback = sinon.stub(); - mediator.subscribe(TEST_CHANNEL,subscribeCallback); - mediator.publish(TEST_CHANNEL,false); - sinon.assert.calledOnce(subscribeCallback); - sinon.assert.calledWith(subscribeCallback,false); - }); - it('Should return args',function() { - var subscribeCb = sinon.stub().returnsArg(0); - var testNumber = 123456789; - var testArray = ['Hello','mediator',', ','how','are','you?']; - var testString = "Hello World!"; - var testObject = { - name: 'Testing Object', - value: undefined - }; - mediator.subscribe(TEST_CHANNEL,subscribeCb); - - mediator.publish(TEST_CHANNEL,false); - mediator.publish(TEST_CHANNEL,testNumber); - mediator.publish(TEST_CHANNEL,testString); - mediator.publish(TEST_CHANNEL,testArray); - mediator.publish(TEST_CHANNEL,testObject); - - assert.equal(subscribeCb.getCall(0).returnValue, false); - assert.equal(subscribeCb.getCall(1).returnValue, testNumber); - assert.equal(subscribeCb.getCall(2).returnValue, testString); - assert.equal(subscribeCb.getCall(3).returnValue, testArray); - assert.equal(subscribeCb.getCall(4).returnValue, testObject); - }); - }); - describe('#once',function() { - const TEST_CHANNEL = "once:channel"; - - it('Should be registered only once',function() { - var CB = sinon.spy(); - mediator.once(TEST_CHANNEL,CB); - mediator.publish(TEST_CHANNEL,"sample_data"); - sinon.assert.calledOnce(CB); - mediator.publish(TEST_CHANNEL,"should not be subscribed"); - sinon.assert.calledOnce(CB); - mediator.publish("not:even:valid:channel",{ - username: 'Gandalf', - message: 'You shall not pass' - }); - sinon.assert.calledOnce(CB); - }); - }); - describe('#promise',function() { - const TEST_CHANNEL = "promise:channel"; - - it('Should call delayed callback',function(done) { - var promiseCB = sinon.stub(); - mediator.promise(TEST_CHANNEL).then(promiseCB); - var promised = Promise.delay(1, "WUHU"); - mediator.publish(TEST_CHANNEL, promised); - setTimeout(function() { - sinon.assert.called(promiseCB); - sinon.assert.calledWith(promiseCB.getCall(0),"WUHU"); - done(); - }, 3); - }); - - it('Should be called only once',function(done) { - var promiseCB = sinon.stub(); - mediator.promise(TEST_CHANNEL).then(promiseCB); - var promised = Promise.delay(1, { - goodCharacters: ['Frodo','Aragorn','Legolas'], - evilOnes: ['Sauron','Saruman'] - }); - mediator.publish(TEST_CHANNEL, promised); - mediator.publish(TEST_CHANNEL, ['Another','Set','Of','Data','That','Should','Not','Be','Accepted']); - setTimeout(function() { - sinon.assert.callCount(promiseCB,1); - done(); - }, 3); - }); - - it('Should call error callback',function(done) { - var successCB = sinon.spy(); - var errorCB = sinon.spy(); - mediator.promise(TEST_CHANNEL).then(successCB, errorCB); - var rejectedData = Promise.reject(new Error('Boromir died')).delay(1); - mediator.publish(TEST_CHANNEL,rejectedData); - setTimeout(function() { - sinon.assert.notCalled(successCB); - sinon.assert.callCount(errorCB,1); - done(); - }, 3); - }); - }); - describe('#remove',function() { - const TEST_CHANNEL = "remove:channel"; - - it('Should remove all callbacks', function() { - var firstSpy = sinon.spy(); - var secondSpy = sinon.spy(); - mediator.subscribe(TEST_CHANNEL,firstSpy); - mediator.subscribe(TEST_CHANNEL,secondSpy); - mediator.publish(TEST_CHANNEL,"data"); - sinon.assert.calledOnce(firstSpy); - sinon.assert.calledOnce(secondSpy); - mediator.remove(TEST_CHANNEL); - mediator.publish(TEST_CHANNEL,"another-data"); - sinon.assert.calledOnce(firstSpy); - sinon.assert.calledOnce(secondSpy); - }); - - it('Should remove specific callback', function() { - var firstCB = sinon.spy(); - var secondCB = sinon.spy(); - mediator.subscribe(TEST_CHANNEL,firstCB); - mediator.subscribe(TEST_CHANNEL,secondCB); - mediator.publish(TEST_CHANNEL,123456); - sinon.assert.calledOnce(firstCB); - sinon.assert.calledOnce(secondCB); - mediator.remove(TEST_CHANNEL,secondCB); - mediator.publish(TEST_CHANNEL,"another portion of data"); - sinon.assert.calledTwice(firstCB); - sinon.assert.calledOnce(secondCB); - }); - }); -}); diff --git a/lib/mediator.js b/lib/mediator.js deleted file mode 100644 index 2959791..0000000 --- a/lib/mediator.js +++ /dev/null @@ -1,110 +0,0 @@ -'use strict'; - -var _ = require('lodash'); -var Mediator = require('mediator-js').Mediator; -var Promise = require('bluebird'); - -/** - * A version of {@link once} that returns a Promise - * @param {String} channel Channel identifier to wait on a single message - * @return {Promise} A promise that is fulfilled with the next published - * message in the channel - */ -Mediator.prototype.promise = function(channel, options, context) { - var self = this; - return new Promise(function(resolve) { - self.once(channel, resolve, options, context); - }); -}; - -/** - * Publishes a message on a topic and wait for a response or error - * - * By convention 'return' topics are prefixed with 'done:' and 'error:' to signal - * the result of the operation, and suffixed with a unique id to map clients in - * order to supply the results to the correct client. - * - * @param {String} topic Channel identifier to publish the initial message - * - * @param {Any} parameters The data to publish to the topic. The unique id used to publish - * the 'return' topic is extracted from this parameter according to - * the following rules: - * - `parameters.id` property, If parameters has this property - * - `parameters[0]` if parameters is an Array - * - `parameters.toString()` otherwise - * - * @param {Object} options Options object - * @param {String} options.uid Overrides the unique id from the {@link parameters} - * @param {String} options.doneTopic Base topic to subscribe for the result of the request, - * gets prefixed with 'done:' - * @param {String} options.errorTopic Base topic to subscribe for errors on the request, - * gets prefixed with 'error:' - * - * @return {Promise} A Promise that gets fulfilled with the result of the request - * or rejected with the error from the above topics - */ -Mediator.prototype.request = function(topic, parameters, options) { - var self = this; - options = options || {}; - var topics = { - request: topic, - done: options.doneTopic || 'done:' + topic, - error: options.errorTopic || 'error:' + topic - }; - var subs = {}; - - var uid = null; - if (_.has(options, 'uid')) { - uid = options.uid; - } else if (typeof parameters !== "undefined" && parameters !== null) { - if (_.has(parameters, 'id')) { - uid = parameters.id; - } else { - uid = parameters instanceof Array ? parameters[0] : parameters.toString(); - } - } - - if (uid !== null) { - topics.done += ':' + uid; - topics.error += ':' + uid; - } else { - console.error('Warning: no status topics defined for ' + topic); - } - - if (!options.timeout) { - options.timeout = 60000; - } - - function unsubscribe() { - self.remove(topics.done, subs.done.id); - self.remove(topics.error, subs.error.id); - } - - var args = [topics.request]; - if (parameters instanceof Array) { - Array.prototype.push.apply(args, parameters); - } else { - args.push(parameters); - } - - // must setup subscriptions before publish - var resultPromise = new Promise(function(resolve, reject) { - subs.done = self.once(topics.done, resolve); - subs.error = self.once(topics.error, reject); - }); - - self.publish.apply(mediator, args); - - return resultPromise - .timeout(options.timeout, new Error('Mediator request timeout for topic ' + topic)) - .finally(function() { - unsubscribe(); - }); -}; - -// Exposing topics utility -Mediator.prototype.Topics = require("./topics"); - -var mediator = new Mediator(); -mediator.Mediator = Mediator; -module.exports = mediator; diff --git a/lib/mediator/channel-spec.js b/lib/mediator/channel-spec.js new file mode 100644 index 0000000..6509163 --- /dev/null +++ b/lib/mediator/channel-spec.js @@ -0,0 +1,311 @@ +var Mediator = require("./index").Mediator, + sinon = require('sinon'), + chai = require('chai'), + expect = require('chai').expect, + sinonChai = require("sinon-chai"); + +chai.use(sinonChai); + +describe("Channel", function() { + var channel; + + beforeEach(function() { + channel = new Mediator.Channel(); + }); + + describe("Initialization", function() { + it("should set its namespace property", function() { + expect(channel.namespace).to.equal(""); + }); + + it("should set its namespace property to a given namespace", function() { + var namespacedChannel = new Mediator.Channel("test:coffee"); + expect(namespacedChannel.namespace).to.equal("test:coffee"); + }); + + it("should act like a constructor when called like a function", function() { + var fnChannel = Mediator.Channel("name"); + + expect(fnChannel).not.to.be.undefined; + }); + }); + + describe("addSubscriber", function() { + it("should add a subscriber to the collection", function() { + var spy = sinon.spy(); + channel.addSubscriber(spy); + + expect(channel._subscribers.length).to.equal(1); + }); + + it("should give subscribers an id", function() { + var spy = sinon.spy(); + channel.addSubscriber(spy); + + expect(channel._subscribers[0].id).to.not.be.undefined; + expect(channel._subscribers[0].id).to.not.equal(''); + }); + + it("should add a subscriber to the collection with context", function() { + var spy = sinon.spy(), + contextObj = {derp: "herp"}; + + channel.addSubscriber(spy, {}, contextObj); + expect(channel._subscribers[0].context).to.equal(contextObj); + }); + + it("should add a subscriber to the collection with options", function() { + var spy = sinon.spy(), + contextObj = {}, + optionsObj = {derp: "herp"}; + + channel.addSubscriber(spy, optionsObj, contextObj); + expect(channel._subscribers[0].options).to.equal(optionsObj); + }); + + it("should be able to set top priority", function() { + var spy = sinon.spy(), + spy2 = sinon.spy(), + spy3 = sinon.spy(); + + channel.addSubscriber(spy); + channel.addSubscriber(spy2); + channel.addSubscriber(spy3, {priority: 1}); + + expect(channel._subscribers[0].fn).to.equal(spy); + expect(channel._subscribers[1].fn).to.equal(spy3); + expect(channel._subscribers[2].fn).to.equal(spy2); + }); + + it("should be able to set arbitrary priority", function() { + var spy = sinon.spy(), + spy2 = sinon.spy(), + spy3 = sinon.spy(); + + channel.addSubscriber(spy); + channel.addSubscriber(spy2); + channel.addSubscriber(spy3, {priority: 1}); + + expect(channel._subscribers[0].fn).to.equal(spy); + expect(channel._subscribers[1].fn).to.equal(spy3); + expect(channel._subscribers[2].fn).to.equal(spy2); + }); + + it("should be able to change priority after adding it", function() { + var spy = sinon.spy(), + spy2 = sinon.spy(), + spy3 = sinon.spy(); + + var sub = channel.addSubscriber(spy, {num: 1}); + channel.addSubscriber(spy2, {num: 2}); + channel.addSubscriber(spy3, {num: 3}); + + channel.setPriority(sub.id, 2); + + expect(channel._subscribers[0].fn).to.equal(spy2); + expect(channel._subscribers[1].fn).to.equal(spy3); + expect(channel._subscribers[2].fn).to.equal(spy); + + }); + }); + + describe("GetSubscriber", function() { + it("should get a subscriber by its id", function() { + var spy = sinon.spy(); + channel.addSubscriber(spy); + + expect(channel.getSubscriber(channel._subscribers[0].id)).to.not.be.undefined; + }); + }); + + describe("addChannel", function() { + it("should add a channel to the collection", function() { + var channelName = "test"; + channel.addChannel(channelName); + + expect(channel._channels[channelName]).to.not.be.undefined; + }); + }); + + describe("hasChannel", function() { + it("should return true if the channel exists", function() { + var channelName = "test"; + channel.addChannel(channelName); + + expect(channel.hasChannel(channelName)).to.equal(true); + }); + + it("should return true if the channel does not exist", function() { + var channelName = "test", + badChannelName = "herp"; + + channel.addChannel(channelName); + + expect(channel.hasChannel(badChannelName)).to.equal(false); + }); + }); + + describe("ReturnChannel", function() { + it("should return a reference to a channel by name", function() { + var channelName = "test"; + + channel.addChannel(channelName); + + expect(channel.returnChannel(channelName)).to.equal(channel._channels[channelName]); + }); + }); + + describe("removeSubscriber", function() { + it("should remove subscribers if no fn is given", function() { + var spy = sinon.spy(); + + channel.addSubscriber(spy); + expect(channel._subscribers.length).to.equal(1); + + channel.removeSubscriber(); + expect(channel._subscribers.length).to.equal(0); + }); + + it("should remove matching subscribers a valid fn is given", function() { + var spy = sinon.spy(), + spy2 = sinon.spy(); + + channel.addSubscriber(spy); + channel.addSubscriber(spy2); + expect(channel._subscribers.length).to.equal(2); + + channel.removeSubscriber(spy); + expect(channel._subscribers.length).to.equal(1); + expect(channel._subscribers[0].fn).to.equal(spy2); + }); + + it("should remove matching subscribers a valid id is given", function() { + var spy = sinon.spy(), + spy2 = sinon.spy(), + spy3 = sinon.spy(); + + channel.addSubscriber(spy); + var sub2 = channel.addSubscriber(spy2); + expect(channel._subscribers.length).to.equal(2); + channel.addSubscriber(spy3); + expect(channel._subscribers.length).to.equal(3); + + channel.removeSubscriber(sub2.id); + expect(channel._subscribers.length).to.equal(2); + expect(channel._subscribers[0].fn).to.equal(spy); + expect(channel._subscribers[1].fn).to.equal(spy3); + }); + + it("should do nothing if an invalid fn is given", function() { + var spy = sinon.spy(), + invalidFn = "derp"; + + channel.addSubscriber(spy); + channel.addSubscriber(function() { + }); + expect(channel._subscribers.length).to.equal(2); + + channel.removeSubscriber(invalidFn); + expect(channel._subscribers.length).to.equal(2); + }); + + it("should do nothing if a non-matching fn is given", function() { + var spy = sinon.spy(), + spy2 = sinon.spy(); + + channel.addSubscriber(spy); + expect(channel._subscribers.length).to.equal(1); + + channel.removeSubscriber(spy2); + expect(channel._subscribers.length).to.equal(1); + }); + }); + + + describe("publish", function() { + it("should call all matching subscribers", function() { + var spy = sinon.spy(), + data = ["data"]; + + channel.addSubscriber(spy); + channel.publish(data); + + expect(spy).calledWith(data[0]); + }); + + it("should call all matching subscribers with predicates", function() { + var spy = sinon.spy(), + data = ["data"]; + + channel.addSubscriber(spy, {}, { + predicate: function(data) { + return data.length === 4; + } + }); + channel.publish(data); + + expect(spy).calledWith(data[0]); + }); + + it("should call all matching subscribers with context", function() { + var spy = sinon.spy(), + data = ["data"]; + + channel.addSubscriber(function() { + this(); + }, {}, spy); + channel.publish(data); + + expect(spy).called; + }); + + it("should call all matching for parent channels", function() { + var channelName = "test", + spy = sinon.spy(), + spy2 = sinon.spy(), + data = ["data"]; + + channel.addSubscriber(spy); + channel.addChannel(channelName); + channel._channels[channelName].addSubscriber(spy2); + + channel._channels[channelName].publish(data); + + expect(spy).calledWith(data[0]); + expect(spy2).calledWith(data[0]); + }); + + it("should call all matching subscribers with context", function() { + var spy = sinon.spy(), + data = ["data"]; + + channel.addSubscriber(function() { + this(); + }, {}, spy); + channel.publish(data); + + expect(spy).called; + }); + + it("should call subscribers in predefined priority", function() { + var sub1 = function() { + this.a += "1"; + }, + sub2 = function() { + this.a += "2"; + }, + sub3 = function() { + this.a += "3"; + }, + data = ["data"]; + this.a = "0"; + + channel.addSubscriber(sub3, {}, this); + channel.addSubscriber(sub1, {priority: 2}, this); + channel.addSubscriber(sub2, {priority: 1}, this); + channel.publish(data); + expect(this.a).to.equal("0123"); + }); + + }); +}); \ No newline at end of file diff --git a/lib/mediator/index.js b/lib/mediator/index.js new file mode 100644 index 0000000..e4b9505 --- /dev/null +++ b/lib/mediator/index.js @@ -0,0 +1,506 @@ +var _ = require('lodash'); +var Promise = require('bluebird'); +const CONSTANTS = require('../constants'); + +//Based on https://github.com/ajacksified/Mediator.js +//This change is based on an assumption that the subscribes respond with a promise. + +// We'll generate guids for class instances for easy referencing later on. +// Subscriber instances will have an id that can be refernced for quick +// lookups. + +function guidGenerator() { + var S4 = function() { + return (((1 + Math.random()) * 0x10000) | 0).toString(16).substring(1); + }; + + return (S4() + S4() + "-" + S4() + "-" + S4() + "-" + S4() + "-" + S4() + S4() + S4()); +} + +function getDoneTopic(namespace, topicUid) { + return CONSTANTS.DONE_TOPIC_PREFIX + ":" + namespace + ":" + topicUid; +} + +function getErrorTopic(namespace, topicUid) { + return CONSTANTS.ERROR_TOPIC_PREFIX + ":" + namespace + ":" + topicUid; +} + +function getErrorAndDoneTopicPromises(mediator, namespace, topicUid, timeout) { + var doneTopic = getDoneTopic(namespace, topicUid); + var errorTopic = getErrorTopic(namespace, topicUid); + + var doneTopicPromise = mediator.promise(doneTopic, {noDuplicates: true}); + var errorTopicPromise = mediator.promise(errorTopic, {noDuplicates: true}); + + var topicPromise = new Promise(function(resolve, reject) { + doneTopicPromise.then(function(createdWorkorder) { + resolve(createdWorkorder); + }); + + errorTopicPromise.then(function(error) { + reject(error); + }); + }); + + var timeoutPromise = new Promise(function(resolve, reject) { + setTimeout(function() { + reject(new Error("Timeout For Topic: " + doneTopic)); + }, timeout || 10000); + }); + + //Either one of these promises resolves/rejects or it will time out. + return Promise.race([topicPromise, timeoutPromise]).finally(function() { + mediator.remove(doneTopic); + mediator.remove(errorTopic); + }); +} + +// Subscribers are instances of Mediator Channel registrations. We generate +// an object instance so that it can be updated later on without having to +// unregister and re-register. Subscribers are constructed with a function +// to be called, options object, and context. + +function Subscriber(fn, options, context) { + if (!(this instanceof Subscriber)) { + return new Subscriber(fn, options, context); + } + + this.id = guidGenerator(); + this.fn = fn; + this.options = options; + this.context = context; + this.channel = null; +} + +// Mediator.update on a subscriber instance can update its function,context, +// or options object. It takes in an object and looks for fn, context, or +// options keys. +Subscriber.prototype.update = function(options) { + if (options) { + this.fn = options.fn || this.fn; + this.context = options.context || this.context; + this.options = options.options || this.options; + if (this.channel && this.options && this.options.priority !== undefined) { + this.channel.setPriority(this.id, this.options.priority); + } + } +}; + + +function Channel(namespace, parent) { + if (!(this instanceof Channel)) { + return new Channel(namespace); + } + + this.namespace = namespace || ""; + this._subscribers = []; + this._channels = {}; + this._parent = parent; + this.stopped = false; +} + +// A Mediator channel holds a list of sub-channels and subscribers to be fired +// when Mediator.publish is called on the Mediator instance. It also contains +// some methods to manipulate its lists of data; only setPriority and +// StopPropagation are meant to be used. The other methods should be accessed +// through the Mediator instance. +Channel.prototype.addSubscriber = function(fn, options, context) { + + //Only one subscriber is allowed in this channel. + if (options && options.noDuplicates && this._subscribers.length > 0) { + return this._subscribers[0]; + } + + var subscriber = new Subscriber(fn, options, context); + + if (options && options.priority !== undefined) { + // Cheap hack to either parse as an int or turn it into 0. Runs faster + // in many browsers than parseInt with the benefit that it won't + // return a NaN. + options.priority = options.priority >> 0; + + if (options.priority < 0) { + options.priority = 0; + } + if (options.priority >= this._subscribers.length) { + options.priority = this._subscribers.length - 1; + } + + this._subscribers.splice(options.priority, 0, subscriber); + } else { + this._subscribers.push(subscriber); + } + + subscriber.channel = this; + + return subscriber; +}; + +// The channel instance is passed as an argument to the mediator subscriber, +// and further subscriber propagation can be called with +// channel.StopPropagation(). +Channel.prototype.stopPropagation = function() { + this.stopped = true; +}; + +Channel.prototype.getSubscriber = function(identifier) { + var x = 0, + y = this._subscribers.length; + + for (x, y; x < y; x++) { + if (this._subscribers[x].id === identifier || this._subscribers[x].fn === identifier) { + return this._subscribers[x]; + } + } +}; + +// Channel.setPriority is useful in updating the order in which Subscribers +// are called, and takes an identifier (subscriber id or named function) and +// an array index. It will not search recursively through subchannels. + +Channel.prototype.setPriority = function(identifier, priority) { + var oldIndex = 0, + x = 0, + sub, firstHalf, lastHalf, y; + + for (x = 0, y = this._subscribers.length; x < y; x++) { + if (this._subscribers[x].id === identifier || this._subscribers[x].fn === identifier) { + break; + } + oldIndex++; + } + + sub = this._subscribers[oldIndex]; + firstHalf = this._subscribers.slice(0, oldIndex); + lastHalf = this._subscribers.slice(oldIndex + 1); + + this._subscribers = firstHalf.concat(lastHalf); + this._subscribers.splice(priority, 0, sub); +}; + +Channel.prototype.addChannel = function(channel) { + this._channels[channel] = new Channel((this.namespace ? this.namespace + ':' : '') + channel, this); +}; + +Channel.prototype.hasChannel = function(channel) { + return this._channels.hasOwnProperty(channel); +}; + +Channel.prototype.returnChannel = function(channel) { + return this._channels[channel]; +}; + +Channel.prototype.removeSubscriber = function(identifier) { + var x = this._subscribers.length - 1; + + // If we don't pass in an id, we're clearing all + if (!identifier) { + this._subscribers = []; + return; + } + + // Going backwards makes splicing a whole lot easier. + for (x; x >= 0; x--) { + if (this._subscribers[x].fn === identifier || this._subscribers[x].id === identifier) { + this._subscribers[x].channel = null; + this._subscribers.splice(x, 1); + } + } +}; + +// This will publish arbitrary arguments to a subscriber and then to parent +// channels. + +Channel.prototype.publish = function(data) { + var x = 0, + y = this._subscribers.length, + shouldCall = false, + subscriber, + subsBefore, subsAfter; + + var self = this; + + var promises = []; + + // Priority is preserved in the _subscribers index. + for (x, y; x < y; x++) { + // By default set the flag to false + shouldCall = false; + subscriber = this._subscribers[x]; + + if (!this.stopped) { + subsBefore = this._subscribers.length; + if (subscriber.options !== undefined && typeof subscriber.options.predicate === "function") { + if (subscriber.options.predicate.apply(subscriber.context, data)) { + // The predicate matches, the callback function should be called + shouldCall = true; + } + } else { + // There is no predicate to match, the callback should always be called + shouldCall = true; + } + } + + // Check if the callback should be called + if (shouldCall) { + // Check if the subscriber has options and if this include the calls options + if (subscriber.options && subscriber.options.calls !== undefined) { + // Decrease the number of calls left by one + subscriber.options.calls--; + // Once the number of calls left reaches zero or less we need to remove the subscriber + if (subscriber.options.calls < 1) { + this.removeSubscriber(subscriber.id); + } + } + + // Now we call the callback, if this in turns publishes to the same channel it will no longer + // cause the callback to be called as we just removed it as a subscriber + var subscriberResult = subscriber.fn.apply(subscriber.context, data); + + //If the result of the subscriber function is a promise, we are interested in the result. + //If it isn't, we are not interested in the result. + subscriberResult = _.isObject(subscriberResult) && _.isFunction(subscriberResult.then) ? subscriberResult : Promise.resolve(); + + //Handling the result + subscriberResult = subscriberResult.then(function(result) { + return result === null || result === undefined ? result : { + topic: self.namespace, + result: result + }; + }); + + promises.push(subscriberResult); + + subsAfter = this._subscribers.length; + y = subsAfter; + if (subsAfter === subsBefore - 1) { + x--; + } + } + } + + this.stopped = false; + + if (this._parent) { + this._parent.publish(data); + } + + //All related promises to this channel have been added. + return _.flatten(promises); +}; + +function Mediator() { + if (!(this instanceof Mediator)) { + return new Mediator(); + } + + this.uuid = guidGenerator(); + + this._channels = new Channel(''); +} + +// A Mediator instance is the interface through which events are registered +// and removed from publish channels. + +// Returns a channel instance based on namespace, for example +// application:chat:message:received. If readOnly is true we +// will refrain from creating non existing channels. +Mediator.prototype.getChannel = function(namespace, readOnly) { + var channel = this._channels, + namespaceHierarchy = namespace.split(':'), + x = 0, + y = namespaceHierarchy.length; + + if (namespace === '') { + return channel; + } + + if (namespaceHierarchy.length > 0) { + for (x, y; x < y; x++) { + + if (!channel.hasChannel(namespaceHierarchy[x])) { + if (readOnly) { + break; + } else { + channel.addChannel(namespaceHierarchy[x]); + } + } + + channel = channel.returnChannel(namespaceHierarchy[x]); + } + } + + return channel; +}; + +// Pass in a channel namespace, function to be called, options, and context +// to call the function in to Subscribe. It will create a channel if one +// does not exist. Options can include a predicate to determine if it +// should be called (based on the data published to it) and a priority +// index. + +Mediator.prototype.subscribe = function(channelName, fn, options, context) { + var channel = this.getChannel(channelName || "", false); + + options = options || {}; + context = context || {}; + + return channel.addSubscriber(fn, options, context); +}; + +// Pass in a channel namespace, function to be called, options, and context +// to call the function in to Subscribe. It will create a channel if one +// does not exist. Options can include a predicate to determine if it +// should be called (based on the data published to it) and a priority +// index. + +Mediator.prototype.once = function(channelName, fn, options, context) { + options = options || {}; + options.calls = 1; + + return this.subscribe(channelName, fn, options, context); +}; + +// Returns a subscriber for a given subscriber id / named function and +// channel namespace + +Mediator.prototype.getSubscriber = function(identifier, channelName) { + var channel = this.getChannel(channelName || "", true); + // We have to check if channel within the hierarchy exists and if it is + // an exact match for the requested channel + if (channel.namespace !== channelName) { + return null; + } + + return channel.getSubscriber(identifier); +}; + +// Remove a subscriber from a given channel namespace recursively based on +// a passed-in subscriber id or named function. + +Mediator.prototype.remove = function(channelName, identifier) { + var channel = this.getChannel(channelName || "", true); + if (channel.namespace !== channelName) { + return false; + } + + channel.removeSubscriber(identifier); +}; + +// Publishes arbitrary data to a given channel namespace. Channels are +// called recursively downwards; a post to application:chat will post to +// application:chat:receive and application:chat:derp:test:beta:bananas. +// Called using Mediator.publish("application:chat", [ args ]); + +Mediator.prototype.publish = function(channelName) { + var self = this; + var channel = this.getChannel(channelName || "", true); + if (channel.namespace !== channelName) { + return null; + } + + var args = Array.prototype.slice.call(arguments, 1); + + args.push(channel); + + + + //If there is a topicUid, we need to subscribe to the error and done topics + var topicUidPromise = args[0].topicUid ? getErrorAndDoneTopicPromises(this, channelName, args[0].topicUid) : Promise.resolve(); + + //All promises must be resolved for the published topic to be complete. + return Promise.all(channel.publish(args)).then(function(results) { + + return topicUidPromise.then(function(doneResult) { + //Removing any un-required results + //NOTE: The subscriber with the highest priority result is resolved. + var topicResponse = _.first(_.compact(_.flatten(results))); + topicResponse = topicResponse || {result: undefined}; + + return doneResult ? doneResult : topicResponse.result; + }); + }).then(function(result) { + if (result) { + self.publish(CONSTANTS.DONE_TOPIC_PREFIX + CONSTANTS.TOPIC_SEPERATOR + channel.namespace, result); + } + return result; + }); +}; + +Mediator.prototype.promise = function(channel, options, context) { + var self = this; + return new Promise(function(resolve) { + self.once(channel, resolve, options, context); + }); +}; + +/** + * Publishes a message on a topic and wait for a response or error + * + * @param {String} topic Channel identifier to publish the initial message + * + * @param {Any} parameters The data to publish to the topic. The unique id used to publish + * the 'return' topic is extracted from this parameter according to + * the following rules: + * - `parameters.id` property, If parameters has this property + * - `parameters[0]` if parameters is an Array + * - `parameters.toString()` otherwise + * + * @param {Object} options Options object + * @param {Number} [options.timeout] - Optional timeout for the request to finish + * + * @return {Promise} A Promise that gets fulfilled with the result of the request + * or rejected with the error from the above topics + */ +Mediator.prototype.request = function(topic, parameters, options) { + var self = this; + options = options || {}; + + if (!options.timeout) { + options.timeout = 60000; + } + + var topicUid = null; + if (_.has(options, 'uid')) { + topicUid = options.uid; + } else if (typeof parameters !== "undefined" && parameters !== null) { + if (_.has(parameters, 'id')) { + topicUid = parameters.id; + } else { + topicUid = parameters instanceof Array ? parameters[0] : parameters.toString(); + } + } + + var args = [topic]; + if (parameters instanceof Array) { + Array.prototype.push.apply(args, parameters); + } else if (parameters) { + args.push(parameters); + } + + if (_.isObject(args[1])) { + args[1].topicUid = topicUid; + } + + var publishPromise = self.publish.apply(self, args); + + return publishPromise ? publishPromise.timeout(options.timeout, new Error('Mediator request timeout for topic ' + topic)) : Promise.reject(new Error("No subscribers exist for topic " + topic)); +}; + + +// Alias some common names for easy interop +Mediator.prototype.on = Mediator.prototype.subscribe; +Mediator.prototype.bind = Mediator.prototype.subscribe; +Mediator.prototype.emit = Mediator.prototype.publish; +Mediator.prototype.trigger = Mediator.prototype.publish; +Mediator.prototype.off = Mediator.prototype.remove; + +// Finally, expose it all. + +Mediator.Channel = Channel; +Mediator.Subscriber = Subscriber; +Mediator.version = "0.9.8"; + +var mediator = new Mediator(); +mediator.Mediator = Mediator; +module.exports = mediator; diff --git a/lib/mediator/mediator-spec.js b/lib/mediator/mediator-spec.js new file mode 100644 index 0000000..1f222f5 --- /dev/null +++ b/lib/mediator/mediator-spec.js @@ -0,0 +1,787 @@ +var mediator = require('./index'); +var Mediator = mediator.Mediator; +var Promise = require('bluebird'); +var assert = require('chai').assert; +var expect = require('chai').expect; +const CONSTANTS = require('../constants'); + +var sinon = require('sinon'); +require('sinon-as-promised'); + +describe("Mediator", function() { + var mediator; + + beforeEach(function() { + mediator = new Mediator(); + }); + + describe('topicUid', function() { + const TEST_CHANNEL = "test:channel"; + const TEST_CHANNEL_CREATE = "test:channel:create"; + + afterEach(function() { + mediator.remove(TEST_CHANNEL); + mediator.remove(TEST_CHANNEL_CREATE); + }); + + it('should handle done topic UID', function(done) { + var topicCalled = false; + //This subscriber does not return a promise, instead publishing its own done topics + //We have to wait for the done topic + mediator.subscribe(TEST_CHANNEL_CREATE, function(params) { + + setTimeout(function() { + topicCalled = true; + var doneTopic = 'done:' + TEST_CHANNEL_CREATE + ":" + params.topicUid; + mediator.publish(doneTopic, "test"); + }, 300); + }); + + mediator.publish(TEST_CHANNEL_CREATE, {thing: "asd", topicUid: "topicuid"}).then(function(result) { + assert(topicCalled, "Expected the topic to be called"); + assert.equal("test", result); + done(); + }); + }); + + it('should handle error topic UID', function(done) { + var topicCalled = false; + //This subscriber does not return a promise, instead publishing its own done topics + //We have to wait for the done topic + mediator.subscribe(TEST_CHANNEL_CREATE, function(params) { + + setTimeout(function() { + topicCalled = true; + var doneTopic = 'error:' + TEST_CHANNEL_CREATE + ":" + params.topicUid; + mediator.publish(doneTopic, new Error("Meh")); + }, 300); + }); + + mediator.publish(TEST_CHANNEL_CREATE, {thing: "asd", topicUid: "topicuid"}).catch(function(err) { + assert(topicCalled, "Expected the topic to be called"); + assert.equal("Meh", err.message); + done(); + }); + }); + + it('should handle a request', function(done) { + var topicCalled = false; + //This subscriber does not return a promise, instead publishing its own done topics + //We have to wait for the done topic + mediator.subscribe(TEST_CHANNEL_CREATE, function(params) { + + setTimeout(function() { + topicCalled = true; + mediator.publish('done:' + TEST_CHANNEL + ":create:" + params.topicUid); + }, 300); + }); + + mediator.request(TEST_CHANNEL_CREATE, { + name: "thing" + }, {uid: "topicuid"}).then(function() { + assert(topicCalled, "Expected the topic to be called"); + done(); + }); + }); + + it('should handle an error request', function(done) { + var topicCalled = false; + //This subscriber does not return a promise, instead publishing its own done topics + //We have to wait for the done topic + mediator.subscribe(TEST_CHANNEL_CREATE, function(params) { + + setTimeout(function() { + topicCalled = true; + mediator.publish('error:' + TEST_CHANNEL + ":create:" + params.topicUid, new Error('Meh')); + }, 300); + }); + + mediator.request(TEST_CHANNEL_CREATE, { + name: "thing" + }, {uid: "topicuid"}).catch(function(err) { + assert(topicCalled, "Expected the topic to be called"); + assert.equal("Meh", err.message); + done(); + }); + }); + + }); + + describe('#subscribe', function() { + const TEST_CHANNEL = "test_channel"; + + afterEach(function() { + mediator.remove(TEST_CHANNEL); + }); + + it('Should call callback', function() { + var subscribeCallback = sinon.spy(); + mediator.subscribe(TEST_CHANNEL, subscribeCallback); + mediator.publish(TEST_CHANNEL, "my_data"); + sinon.assert.calledOnce(subscribeCallback); + mediator.publish(TEST_CHANNEL, "another"); + sinon.assert.calledTwice(subscribeCallback); + }); + + it('Should accept args', function() { + var subscribeCallback = sinon.stub(); + mediator.subscribe(TEST_CHANNEL, subscribeCallback); + mediator.publish(TEST_CHANNEL, false); + sinon.assert.calledOnce(subscribeCallback); + sinon.assert.calledWith(subscribeCallback, false); + }); + + it('Should return args', function() { + var subscribeCb = sinon.stub().returnsArg(0); + var testNumber = 123456789; + var testArray = ['Hello', 'mediator', ', ', 'how', 'are', 'you?']; + var testString = "Hello World!"; + var testObject = { + name: 'Testing Object', + value: undefined + }; + mediator.subscribe(TEST_CHANNEL, subscribeCb); + + mediator.publish(TEST_CHANNEL, false); + mediator.publish(TEST_CHANNEL, testNumber); + mediator.publish(TEST_CHANNEL, testString); + mediator.publish(TEST_CHANNEL, testArray); + mediator.publish(TEST_CHANNEL, testObject); + + assert.equal(subscribeCb.getCall(0).returnValue, false); + assert.equal(subscribeCb.getCall(1).returnValue, testNumber); + assert.equal(subscribeCb.getCall(2).returnValue, testString); + assert.equal(subscribeCb.getCall(3).returnValue, testArray); + assert.equal(subscribeCb.getCall(4).returnValue, testObject); + }); + + + it('should publish done topics for completed handers', function() { + var subscribeCallback = sinon.stub().resolves("VALUE"); + var subscribeDoneCallback = sinon.spy(); + + var doneTestChannel = CONSTANTS.DONE_TOPIC_PREFIX + CONSTANTS.TOPIC_SEPERATOR + TEST_CHANNEL; + mediator.subscribe(doneTestChannel, subscribeDoneCallback); + mediator.subscribe(TEST_CHANNEL, subscribeCallback); + + return mediator.publish(TEST_CHANNEL, "my_data").then(function() { + sinon.assert.calledOnce(subscribeCallback); + sinon.assert.calledOnce(subscribeDoneCallback); + }); + }); + }); + describe('#once', function() { + const TEST_CHANNEL = "once:channel"; + + it('Should be registered only once', function() { + var CB = sinon.spy(); + mediator.once(TEST_CHANNEL, CB); + mediator.publish(TEST_CHANNEL, "sample_data"); + sinon.assert.calledOnce(CB); + mediator.publish(TEST_CHANNEL, "should not be subscribed"); + sinon.assert.calledOnce(CB); + mediator.publish("not:even:valid:channel", { + username: 'Gandalf', + message: 'You shall not pass' + }); + sinon.assert.calledOnce(CB); + }); + }); + describe('#promise', function() { + const TEST_CHANNEL = "promise:channel"; + + it('Should call delayed callback', function(done) { + var promiseCB = sinon.stub(); + mediator.promise(TEST_CHANNEL).then(promiseCB); + var promised = Promise.delay(1, "WUHU"); + mediator.publish(TEST_CHANNEL, promised); + setTimeout(function() { + sinon.assert.called(promiseCB); + sinon.assert.calledWith(promiseCB.getCall(0), "WUHU"); + done(); + }, 3); + }); + + it('Should be called only once', function(done) { + var promiseCB = sinon.stub(); + mediator.promise(TEST_CHANNEL).then(promiseCB); + var promised = Promise.delay(1, { + goodCharacters: ['Frodo', 'Aragorn', 'Legolas'], + evilOnes: ['Sauron', 'Saruman'] + }); + mediator.publish(TEST_CHANNEL, promised); + mediator.publish(TEST_CHANNEL, ['Another', 'Set', 'Of', 'Data', 'That', 'Should', 'Not', 'Be', 'Accepted']); + setTimeout(function() { + sinon.assert.callCount(promiseCB, 1); + done(); + }, 3); + }); + + it('Should call error callback', function(done) { + var successCB = sinon.spy(); + var errorCB = sinon.spy(); + mediator.promise(TEST_CHANNEL).then(successCB, errorCB); + var rejectedData = Promise.reject(new Error('Boromir died')).delay(1); + mediator.publish(TEST_CHANNEL, rejectedData); + setTimeout(function() { + sinon.assert.notCalled(successCB); + sinon.assert.callCount(errorCB, 1); + done(); + }, 3); + }); + }); + describe('#remove', function() { + const TEST_CHANNEL = "remove:channel"; + + it('Should remove all callbacks', function() { + var firstSpy = sinon.spy(); + var secondSpy = sinon.spy(); + mediator.subscribe(TEST_CHANNEL, firstSpy); + mediator.subscribe(TEST_CHANNEL, secondSpy); + mediator.publish(TEST_CHANNEL, "data"); + sinon.assert.calledOnce(firstSpy); + sinon.assert.calledOnce(secondSpy); + mediator.remove(TEST_CHANNEL); + mediator.publish(TEST_CHANNEL, "another-data"); + sinon.assert.calledOnce(firstSpy); + sinon.assert.calledOnce(secondSpy); + }); + + it('Should remove specific callback', function() { + var firstCB = sinon.spy(); + var secondCB = sinon.spy(); + mediator.subscribe(TEST_CHANNEL, firstCB); + mediator.subscribe(TEST_CHANNEL, secondCB); + mediator.publish(TEST_CHANNEL, 123456); + sinon.assert.calledOnce(firstCB); + sinon.assert.calledOnce(secondCB); + mediator.remove(TEST_CHANNEL, secondCB); + mediator.publish(TEST_CHANNEL, "another portion of data"); + sinon.assert.calledTwice(firstCB); + sinon.assert.calledOnce(secondCB); + }); + }); + + + describe('#mediator promises', function() { + const TEST_CHANNEL = "test:channel"; + var value = {test: "val"}; + + beforeEach(function() { + mediator.remove(TEST_CHANNEL); + }); + + it('should handle a promise result', function(done) { + mediator.subscribe(TEST_CHANNEL, function(param) { + assert.equal(value, param); + + return Promise.resolve(param); + }); + + mediator.publish(TEST_CHANNEL, value).then(function(result) { + assert.equal(value, result); + done(); + }); + }); + + it('should handle a promise error', function(done) { + var expectedErr = new Error("Error Doing Something"); + mediator.subscribe(TEST_CHANNEL, function(param) { + assert.equal(value, param); + + return Promise.reject(expectedErr); + }); + + mediator.publish(TEST_CHANNEL, value).then(function() { + done(new Error("Did not expect to resolve")); + }).catch(function(err) { + assert.equal(expectedErr, err); + done(); + }); + }); + + it('should remove non-promise results', function(done) { + mediator.subscribe(TEST_CHANNEL, function(param) { + assert.equal(value, param); + + return param; + }); + + mediator.publish(TEST_CHANNEL, value).then(function(result) { + assert.equal(undefined, result); + done(); + }); + }); + + it('should handle multiple subscribers', function(done) { + mediator.subscribe(TEST_CHANNEL, function(param) { + assert.equal(value, param); + + return param; + }); + + mediator.subscribe(TEST_CHANNEL, function(param) { + assert.equal(value, param); + + return Promise.resolve(param); + }); + + mediator.publish(TEST_CHANNEL, value).then(function(result) { + assert.equal(value, result); + done(); + }); + }); + + it('should only handle promises on the channel it published on. Parent channels should be published to but are not considered as part of resolution.' , function() { + this.timeout(120); + var channel1 = "test:channel:one"; + var channel2 = "test:channel:one:two"; + + mediator.subscribe(channel1, function() { + return new Promise(function(resolve) { + setTimeout(resolve, 200); + }); + }); + + mediator.subscribe(channel2, function() { + return new Promise(function(resolve) { + setTimeout(resolve, 100); + }); + }); + + return mediator.publish(channel2, "TEST"); + + }); + + it('should not consider errors from parent channels', function() { + this.timeout(120); + var channel1 = "test:channel:one"; + var channel2 = "test:channel:one:two"; + + mediator.subscribe(channel1, function() { + return new Promise(function(resolve, reject) { + setTimeout(function() { + reject(new Error("Some Error")); + }, 200); + }); + }); + + mediator.subscribe(channel2, function() { + return new Promise(function(resolve) { + setTimeout(resolve, 100); + }); + }); + + return mediator.publish(channel2, "TEST"); + }); + + }); + + describe("initializing", function() { + it("should act like a constructor when called like a function", function() { + var fnMediator = Mediator(); + expect(fnMediator).not.to.be.undefined; + }); + + it("should start with a channel", function() { + expect(mediator.getChannel('')).not.to.be.undefined; + }); + }); + + describe("subscribing", function() { + it("should subscribe to a given channel", function() { + var spy = sinon.spy(); + mediator.subscribe("test", spy); + expect(mediator.getChannel("test")._subscribers.length).to.equal(1); + }); + + it("should bind 'once'", function() { + var spy = sinon.spy(); + mediator.once("test", spy); + mediator.publish("test"); + mediator.publish("test"); + + expect(spy).calledOnce; + }); + + it("should bind with arbitrary number of calls", function() { + var spy = sinon.spy(), i; + mediator.subscribe("test", spy, {calls: 3}); + + for (i = 0; i < 5; i++) { + mediator.publish("test"); + } + + expect(spy).calledThrice; + }); + + it("should bind with arbitrary number of calls when predicate matches", function() { + var spy = sinon.spy(), + spy2 = sinon.spy(), + subscriber1 = mediator.subscribe("test", spy, { + calls: 3, predicate: function(d) { + return (d === 1); + } + }), + subscriber2 = mediator.subscribe("test", spy2, { + calls: 3, predicate: function(d) { + return (d === 2); + } + }); + + mediator.publish("test", 1); + mediator.publish("test", 2); + + expect(spy).calledOnce; + expect(subscriber1.options.calls).to.equal(2); + expect(subscriber2.options.calls).to.equal(2); + }); + + it("should remove a subscriber in a list of others that's been called its maximum amount of times", function() { + var spy = sinon.spy(), i; + + mediator.subscribe("test", function() { + }); + mediator.subscribe("test", spy, {calls: 3}); + mediator.subscribe("test", function() { + }); + + for (i = 0; i < 5; i++) { + mediator.publish("test"); + } + + expect(spy).calledThrice; + }); + }); + + describe("publishing", function() { + it("should call a subscriber for a given channel", function() { + var spy = sinon.spy(); + + mediator.subscribe("testX", spy); + mediator.publish("testX"); + + expect(spy).called; + }); + + it("should stop propagation if requested", function() { + var spy = sinon.spy(), + spy2 = sinon.spy(), + subscriber = function(c) { + c.stopPropagation(); + spy(); + }, + subscriber2 = function() { + spy2(); + }; + + mediator.subscribe("testX", subscriber); + mediator.subscribe("testX", subscriber2); + mediator.publish("testX"); + + expect(spy).called; + expect(spy2).not.called; + }); + + + it("should call subscribers for all functions in a given channel", function() { + var spy = sinon.spy(), + spy2 = sinon.spy(); + + mediator.subscribe("test", spy); + mediator.subscribe("test", spy2); + mediator.publish("test"); + + expect(spy).called; + expect(spy2).called; + }); + + it("should pass arguments to the given function", function() { + var spy = sinon.spy(), + channel = "test", + arg = "arg1", + arg2 = "arg2"; + + mediator.subscribe(channel, spy); + mediator.publish(channel, arg, arg2); + + sinon.assert.calledWith(spy, arg, arg2, mediator.getChannel(channel)); + }); + + it("should call all matching predicates", function() { + var spy = sinon.spy(), + spy2 = sinon.spy(), + spy3 = sinon.spy(); + + var predicate = function(data) { + return data.length === 4; + }; + + var predicate2 = function(data) { + return data[0] === "Y"; + }; + + mediator.subscribe("test", spy, {predicate: predicate}); + mediator.subscribe("test", spy2, {predicate: predicate2}); + mediator.subscribe("test", spy3); + + mediator.publish("test", "Test"); + + expect(spy).called; + expect(spy2).not.called; + expect(spy3).called; + }); + + }); + + describe("removing", function() { + it("should remove subscribers for a given channel", function() { + var spy = sinon.spy(); + + mediator.subscribe("test", spy); + mediator.remove("test"); + mediator.publish("test"); + + expect(spy).not.called; + }); + + it("should allow subscriber to remove itself", function() { + var removerCalled = false; + var predicate = function() { + return true; + }; + var remover = function() { + removerCalled = true; + mediator.remove("test", sub.id); + }; + + var spy1 = sinon.spy(); + + var sub = mediator.subscribe("test", remover, {predicate: predicate}); + mediator.subscribe("test", spy1); + mediator.publish("test"); + + expect(removerCalled).to.be.true; + expect(spy1).called; + expect(mediator.getChannel("test")._subscribers.length).to.equal(1); + }); + + it("should remove subscribers for a given channel / named function pair", function() { + var spy = sinon.spy(), + spy2 = sinon.spy(); + + mediator.subscribe("test", spy); + mediator.subscribe("test", spy2); + mediator.remove("test", spy); + mediator.publish("test"); + + expect(spy).not.called; + expect(spy2).called; + }); + + it("should remove subscribers by calling from subscriber's callback", function() { + var spy = sinon.spy(), + spy2 = sinon.spy(), + catched = false; + mediator.subscribe("test", function() { + mediator.remove("test"); + }); + mediator.subscribe("test", spy); + mediator.subscribe("test", spy2); + try { + mediator.publish("test"); + } catch (e) { + catched = true; + } + expect(catched).to.be.false; + expect(spy).not.called; + expect(spy2).not.called; + }); + + it("should remove subscriber by calling from its callback", function() { + var remover = function() { + mediator.remove("test", sub.id); + }; + var spy = sinon.spy(), + spy2 = sinon.spy(), + catched = false; + var sub = mediator.subscribe("test", remover); + mediator.subscribe("test", spy); + mediator.subscribe("test", spy2); + try { + mediator.publish("test"); + } catch (e) { + catched = true; + } + expect(catched).to.be.false; + expect(spy).to.called; + expect(spy2).to.called; + remover = sinon.spy(remover); + mediator.publish("test"); + expect(remover).not.to.called; + expect(spy).to.called; + expect(spy2).to.called; + }); + }); + + describe("updating", function() { + it("should update subscriber by identifier", function() { + var spy = sinon.spy(), + newPredicate = function(data) { + return data; + }; + + var sub = mediator.subscribe("test", spy), + subId = sub.id; + + var subThatIReallyGotLater = mediator.getSubscriber(subId, "test"); + subThatIReallyGotLater.update({options: {predicate: newPredicate}}); + expect(subThatIReallyGotLater.options.predicate).to.equal(newPredicate); + }); + + it("should update subscriber priority by identifier", function() { + var spy = sinon.spy(), + spy2 = sinon.spy(), + sub = mediator.subscribe("test", spy), + sub2 = mediator.subscribe("test", spy2); + + sub2.update({options: {priority: 0}}); + + expect(mediator.getChannel("test")._subscribers[0].id).to.equal(sub2.id); + expect(mediator.getChannel("test")._subscribers[1].id).to.equal(sub.id); + }); + + it("should update subscriber by fn", function() { + var spy = sinon.spy(), + newPredicate = function(data) { + return data; + }; + + mediator.subscribe("test", spy); + + var subThatIReallyGotLater = mediator.getSubscriber(spy, "test"); + subThatIReallyGotLater.update({options: {predicate: newPredicate}}); + expect(subThatIReallyGotLater.options.predicate).to.equal(newPredicate); + }); + }); + + describe("namespaces", function() { + it("should make subchannels", function() { + var spy = sinon.spy(); + mediator.subscribe("test:subchannel", spy); + expect(mediator.getChannel("test")._channels["subchannel"]._subscribers.length).to.equal(1); + }); + + it("should call all functions within a given channel namespace", function() { + var spy = sinon.spy(); + var spy2 = sinon.spy(); + + mediator.subscribe("test:channel", spy); + mediator.subscribe("test", spy2); + + mediator.publish("test:channel"); + + expect(spy).called; + expect(spy2).called; + }); + + it("should call only functions within a given channel namespace", function() { + var spy = sinon.spy(); + var spy2 = sinon.spy(); + + mediator.subscribe("test", spy); + mediator.subscribe("derp", spy2); + + mediator.publish("test"); + + expect(spy).called; + expect(spy2).not.called; + }); + + it("should remove functions within a given channel namespace", function() { + var spy = sinon.spy(), + spy2 = sinon.spy(); + + mediator.subscribe("test:test1", spy); + mediator.subscribe("test", spy2); + + mediator.remove("test:test1"); + + mediator.publish("test:test1"); + + expect(spy).not.called; + expect(spy2).called; + }); + + it("should publish to specific namespaces", function() { + var spy = sinon.spy(), + spy2 = sinon.spy(); + + mediator.subscribe("test:test1:test2", spy); + mediator.subscribe("test", spy2); + + mediator.publish("test:test1", "data"); + + expect(spy).not.called; + expect(spy2).called; + }); + + it("should publish to parents of non-existing namespaces", function() { + var spy = sinon.spy(), + spy2 = sinon.spy(); + + mediator.subscribe("test:test1:test2", spy); + mediator.subscribe("test", spy2); + + mediator.publish("test:test1", "data"); + + expect(spy).not.called; + expect(spy2).called; + }); + + }); + + describe("aliases", function() { + it("should alias 'on' and 'bind'", function() { + var spy = sinon.spy(); + + mediator.on("test", spy); + mediator.bind("test", spy); + mediator.publish("test"); + + expect(spy).calledTwice; + }); + + it("should alias 'emit' and 'trigger'", function() { + var spy = sinon.spy(); + + mediator.subscribe("test", spy); + + mediator.emit("test"); + mediator.trigger("test"); + + expect(spy).calledTwice; + }); + + it("should alias 'off' for subscriptions", function() { + var spy = sinon.spy(), + sub; + + sub = mediator.subscribe("test", spy); + mediator.off("test", sub.id); + + mediator.publish("test"); + expect(spy).not.called; + }); + + it("should alias 'off' for channels", function() { + var spy = sinon.spy(); + + mediator.subscribe("test", spy); + mediator.off("test"); + + mediator.publish("test"); + expect(spy).not.called; + }); + }); +}); diff --git a/lib/topics/index-spec.js b/lib/topics/index-spec.js index 22173ea..357fbe2 100644 --- a/lib/topics/index-spec.js +++ b/lib/topics/index-spec.js @@ -164,4 +164,57 @@ describe('Topics', function() { }); }); }); + + describe('#mediator promise', function() { + + it("should return a promise", function(done) { + this.topics.on('find', function(name) { + assert.equal('trever', name); + return {id: 'trever'}; + }); + + mediator.publish(this.topics.getTopic('find'), 'trever').then(function(result) { + assert.equal( 'trever', result.id); + done(); + }); + }); + + it("should handle multiple publishes", function(done) { + var self = this; + self.topics.on('find', function(name) { + assert.equal('trever', name); + return mediator.publish(self.topics.getTopic('test'), name); + }); + + self.topics.on('test', function(name) { + assert.equal('trever', name); + return Promise.resolve({id: 'trever'}); + }); + + mediator.publish(self.topics.getTopic('find'), 'trever').then(function(result) { + assert.equal( 'trever', result.id); + done(); + }); + }); + + }); + + + describe('#mediator publish', function() { + + it('publish the required topic', function(done) { + + var stub = sinon.stub().returns({id: 'trever'}); + + this.topics.on('find', stub); + + this.topics.publish('find', 'trever').then(function(result) { + assert.equal( 'trever', result.id); + sinon.assert.calledOnce(stub); + sinon.assert.calledWith(stub, sinon.match('trever')); + done(); + }); + }); + + }); }); \ No newline at end of file diff --git a/lib/topics/index.js b/lib/topics/index.js index ec7087d..e99c338 100644 --- a/lib/topics/index.js +++ b/lib/topics/index.js @@ -42,8 +42,8 @@ Topics.prototype.addSubscription = function(topic, fn) { /** * Builds a topic name out of the configured {@link prefix} and {@link entity} * @param {String} topicName The name of the sub-topic to build - * @param {String} prefix An optional prefix to the final topic, i.e. 'done' - * @param {String} topicUid An optional unique identifier to append + * @param {String} [prefix] An optional prefix to the final topic, i.e. 'done' + * @param {String} [topicUid] An optional unique identifier to append * @return {String} The complete topic name, * i.e. {prefix}:{this.prefix}:{this.entity}:{topicName}:{topicUid} */ @@ -87,10 +87,15 @@ function wrapInMediatorPromise(self, method, fn) { topic = [topic, error.id].join(':'); } self.mediator.publish(topic, error); + return Promise.reject(error); } return function() { - return Promise.resolve(fn.apply(self, arguments)) + var promise = fn.apply(self, arguments); + + promise = promise && _.isFunction(promise.then) ? promise : Promise.resolve(promise); + + return promise .then(publishDone) .catch(publishError); }; @@ -159,4 +164,22 @@ Topics.prototype.request = function(topic, params, options) { return this.mediator.request(this.getTopic(topic), params, options); }; +/** + * Does a {@link Mediator.request} in the context of the namespaced topics + * @param {String} topic Base topic inside the configured namespace + * @param {Any} params Data for the `request` + * @param {Object} options Options for the `request` + * @return {Promise} The result of the `request` + */ +Topics.prototype.publish = function(topic) { + + var args = Array.prototype.slice.call(arguments, 1); + + args.unshift(this.getTopic(topic)); + + return this.mediator.publish.apply(this.mediator, args); +}; + + + module.exports = Topics; \ No newline at end of file diff --git a/package.json b/package.json index ce57532..55bc43e 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "fh-wfm-mediator", - "version": "0.3.4", + "version": "1.0.0", "description": "An implementation of the mediator pattern for use with WFM", "main": "lib/angular/mediator-ng.js", "repository": "https://github.com/feedhenry-raincatcher/raincatcher-mediator", @@ -14,9 +14,9 @@ "author": "Brian Leathem", "license": "MIT", "dependencies": { + "bluebird": "^3.4.7", "lodash": "^4.7.0", - "mediator-js": "^0.9.9", - "bluebird": "^3.4.7" + "shortid": "^2.2.8" }, "devDependencies": { "chai": "^3.5.0", @@ -25,6 +25,8 @@ "grunt-mocha-test": "^0.13.2", "load-grunt-tasks": "^3.5.2", "mocha": "^3.2.0", - "sinon": "^1.17.7" + "sinon": "^1.17.7", + "sinon-as-promised": "^4.0.3", + "sinon-chai": "^2.11.0" } }