diff --git a/index.js b/index.js index 8228281..3d037de 100644 --- a/index.js +++ b/index.js @@ -7,23 +7,24 @@ const ClientHook = require('./lib/client-hook'); const ListenHook = require('./lib/listen-hook'); const PLUGIN_NAME = 'amqp-transport'; +const TRANSPORT_TYPE = 'amqp'; module.exports = function(opts) { var seneca = this; var so = seneca.options(); var options = seneca.util.deepextend(Defaults, so.transport, opts); - var listen = new ListenHook(seneca, options); - var client = new ClientHook(seneca, options); + var listen = new ListenHook(seneca); + var client = new ClientHook(seneca); seneca.add({ role: 'transport', hook: 'listen', - type: 'amqp' - }, listen.hook()); + type: TRANSPORT_TYPE + }, listen.hook(options)); seneca.add({ role: 'transport', hook: 'client', - type: 'amqp' - }, client.hook()); + type: TRANSPORT_TYPE + }, client.hook(options)); return { name: PLUGIN_NAME diff --git a/lib/amqp-util.js b/lib/amqp-util.js index ff5f303..c6ad84b 100644 --- a/lib/amqp-util.js +++ b/lib/amqp-util.js @@ -1,10 +1,10 @@ 'use strict'; /** - * @module amqp-util - * * Small utility module that groups functions * to resolve queues and topic names on listeners * and clients. + * + * @module lib/amqp-util */ const _ = require('lodash'); const Shortid = require('shortid'); diff --git a/lib/client-hook.js b/lib/client-hook.js index b9f24ed..edf2aec 100644 --- a/lib/client-hook.js +++ b/lib/client-hook.js @@ -1,103 +1,41 @@ 'use strict'; /** - * @module client-hook + * @module lib/client-hook */ -const _ = require('lodash'); -const Async = require('async'); +const Amqp = require('amqplib'); +const Promise = require('bluebird'); const Amqputil = require('./amqp-util'); -const Amqpuri = require('amqpuri'); -const Amqp = require('amqplib/callback_api'); - -var ClientHook = (function() { - var utils; - var self; - - function AMQPClientHook(seneca, options) { - this.seneca = seneca; - this.options = options; - utils = seneca.export('transport/utils'); - self = this; - } - - AMQPClientHook.prototype.start = function(options, done) { - return Async.auto({ - conn: function(cb) { - return Amqp.connect(options.url, options.socketOptions, cb); - }, - channel: ['conn', function(results, cb) { - var conn = results.conn; - conn.createChannel(function(err, channel) { - if (err) { - return cb(err); - } +const SenecaHook = require('./hook'); +const AMQPSenecaClient = require('./client'); + +module.exports = + class AMQPClientHook extends SenecaHook { + createTransport(args) { + return Amqp.connect(args.url, args.socketOptions) + .then((conn) => conn.createChannel()) + .then((channel) => { + var ex = args.exchange; + var qres = args.queues.response; + var queueName = Amqputil.resolveClientQueue(qres); channel.prefetch(1); - channel.on('error', done); - return cb(null, channel); + return Promise.props({ + channel, + exchange: channel.assertExchange(ex.name, ex.type, ex.options), + queue: channel.assertQueue(queueName, qres.options) + }).then((transport) => { + return { + channel: transport.channel, + exchange: transport.exchange.exchange, + queue: transport.queue.queue + }; + }); }); - }], - exchange: ['channel', function(results, cb) { - var channel = results.channel; - var ex = options.exchange; - return channel.assertExchange(ex.name, ex.type, ex.options, function(err, ok) { - return cb(err, ok.exchange); - }); - }], - resQueue: ['channel', function(results, cb) { - var channel = results.channel; - var qres = options.queues.response; - var queueName = Amqputil.resolveClientQueue(qres); - return channel.assertQueue(queueName, qres.options, function(err, ok) { - return cb(err, ok.queue); - }); - }] - }, done); - }; + } - AMQPClientHook.prototype.makeClient = function(options, transport, done) { - return utils.make_client(this.seneca, function(spec, topic, sendDone) { + createActor(args, transport, done) { transport.channel.on('error', done); + var client = new AMQPSenecaClient(this.seneca, transport, args); + return this.utils.make_client(this.seneca, client.callback(), args, done); + } - self.seneca.log.debug('client', 'subscribe', transport.resQueue, options, self.seneca); - transport.channel.consume(transport.resQueue, function(message) { - var content = message.content ? message.content.toString() : undefined; - var input = utils.parseJSON(self.seneca, 'client-' + options.type, content); - utils.handle_response(self.seneca, input, options); - }, { - noAck: true - }); - - sendDone(null, function(args, done) { - var outmsg = utils.prepare_request(this, args, done); - var outstr = utils.stringifyJSON(self.seneca, 'client-' + options.type, outmsg); - var opts = { - replyTo: transport.resQueue - }; - topic = Amqputil.resolveClientTopic(args); - transport.channel.publish(transport.exchange, topic, new Buffer(outstr), opts); - }); - - self.seneca.add('role:seneca,cmd:close', function(closeArgs, done) { - transport.channel.close(); - transport.conn.close(); - this.prior(closeArgs, done); - }); - }, options, done); - }; - - AMQPClientHook.prototype.hook = function() { - return function(args, done) { - args = self.seneca.util.clean(_.extend({}, self.options[args.type], args)); - args.url = Amqpuri.format(args); - return self.start(args, function(err, transport) { - if (err) { - return done(err); - } - return self.makeClient(args, transport, done); - }); - }; }; - - return AMQPClientHook; -})(); - -module.exports = ClientHook; diff --git a/lib/client.js b/lib/client.js new file mode 100644 index 0000000..4fc05f1 --- /dev/null +++ b/lib/client.js @@ -0,0 +1,47 @@ +'use strict'; +/** + * @module lib/client + */ +const Amqputil = require('./amqp-util'); + +module.exports = + class AMQPSenecaClient { + constructor(seneca, transport, options) { + this.seneca = seneca; + this.transport = transport; + this.options = options; + this.utils = seneca.export('transport/utils'); + } + + callback() { + return (spec, topic, sendDone) => this.awaitReply() + .done(() => sendDone(null, this.publish()), sendDone); + } + + publish() { + return (args, done) => { + var outmsg = this.utils.prepare_request(this.seneca, args, done); + var outstr = this.utils.stringifyJSON(this.seneca, 'client-' + this.options.type, outmsg); + var opts = { + replyTo: this.transport.queue + }; + var topic = Amqputil.resolveClientTopic(args); + return this.transport.channel.publish(this.transport.exchange, topic, new Buffer(outstr), opts); + }; + } + + consumeReply() { + return (message) => { + var content = message.content ? message.content.toString() : undefined; + var input = this.utils.parseJSON(this.seneca, 'client-' + this.options.type, content); + this.utils.handle_response(this.seneca, input, this.options); + }; + } + + awaitReply() { + return this.transport.channel.consume(this.transport.queue, + this.consumeReply(), { + noAck: true + }); + } + }; diff --git a/lib/hook.js b/lib/hook.js new file mode 100644 index 0000000..fb15a00 --- /dev/null +++ b/lib/hook.js @@ -0,0 +1,43 @@ +'use strict'; +/** + * @module lib/hook + */ +const Amqpuri = require('amqpuri'); + +module.exports = + class SenecaHook { + constructor(seneca) { + this.seneca = seneca; + this.utils = seneca.export('transport/utils'); + } + + addCloseCmd(transport) { + var seneca = this.seneca; + return this.seneca.add('role:seneca,cmd:close', function(args, done) { + try { + transport.channel.close(); + transport.channel.connection.close(); + } catch (e) { + // Channel already closed or closing + seneca.log.warn('Channel failed to close', e); + } finally { + this.prior(args, done); + } + }); + } + + hook(options) { + return (args, done) => { + args = this.seneca.util.clean(this.seneca.util.deepextend(options[args.type], args)); + args.url = Amqpuri.format(args); + return this.createTransport(args) + .then((transport) => { + transport.channel.on('error', done); + return Promise.all([ + this.addCloseCmd(transport), + this.createActor(args, transport, done) + ]); + }).catch(done); + }; + } + }; diff --git a/lib/listen-hook.js b/lib/listen-hook.js index ff0656b..9d5bc79 100644 --- a/lib/listen-hook.js +++ b/lib/listen-hook.js @@ -1,127 +1,49 @@ 'use strict'; /** - * @module listen-hook + * @module lib/listen-hook */ const _ = require('lodash'); -const Async = require('async'); -const Amqpuri = require('amqpuri'); +const Amqp = require('amqplib'); +const Promise = require('bluebird'); const Amqputil = require('./amqp-util'); -const Amqp = require('amqplib/callback_api'); - -var ListenHook = (function() { - var utils; - var self; - - function AMQPListenHook(seneca, options) { - this.seneca = seneca; - this.options = options; - utils = seneca.export('transport/utils'); - self = this; - } - - AMQPListenHook.prototype.start = function(options, done) { - return Async.auto({ - conn: function(cb) { - return Amqp.connect(options.url, options.socketOptions, cb); - }, - channel: ['conn', function(results, cb) { - var conn = results.conn; - conn.createChannel(function(err, channel) { - if (err) { - return cb(err); - } +const SenecaHook = require('./hook'); +const AMQPSenecaListener = require('./listener'); + +module.exports = + class AMQPListenHook extends SenecaHook { + createTransport(args) { + return Amqp.connect(args.url, args.socketOptions) + .then((conn) => conn.createChannel()) + .then((channel) => { + var ex = args.exchange; channel.prefetch(1); - channel.on('error', done); - return cb(null, channel); - }); - }], - exchange: ['channel', function(results, cb) { - var channel = results.channel; - var ex = options.exchange; - return channel.assertExchange(ex.name, ex.type, ex.options, function(err, ok) { - if (err) { - return cb(err); - } - return cb(null, ok.exchange); - }); - }], - pins: function(cb) { - return cb(null, utils.resolve_pins(options)); - }, - topics: ['pins', function(results, cb) { - var topics = Amqputil.resolveListenTopics(results.pins); - return cb(null, topics); - }], - actQueue: ['exchange', 'channel', 'pins', 'topics', function(results, cb) { - var exchange = results.exchange; - var channel = results.channel; - var qact = options.queues.action; - var queue = _.trim(options.name) || Amqputil.resolveListenQueue(results.pins, qact); - channel.assertQueue(queue, qact.options, function(err) { - if (err) { - return cb(err); - } - Async.each(results.topics, function(topic, done) { - channel.bindQueue(queue, exchange, topic, {}, function(err) { - if (err) { - return cb(err); - } - return done(); + return Promise.all([ + channel, + channel.assertExchange(ex.name, ex.type, ex.options), + this.utils.resolve_pins(args) + ]); + }) + .spread((channel, exchange, pins) => { + var topics = Amqputil.resolveListenTopics(pins); + var qact = args.queues.action; + var queue = _.trim(args.name) || Amqputil.resolveListenQueue(pins, qact); + return channel.assertQueue(queue, qact.options) + .then((q) => Promise.map(topics, + (topic) => channel.bindQueue(q.queue, exchange.exchange, topic)) + ) + .then(() => { + return { + channel, + exchange: exchange.exchange, + queue + }; }); - }, function(err) { - return cb(err, queue); - }); }); - }] - }, done); - }; - - AMQPListenHook.prototype.makeRequestHandlers = function(options, transport, done) { - transport.channel.consume(transport.actQueue, function(message) { - var content = message.content ? message.content.toString() : void 0; - var props = message.properties || {}; - var replyTo = props.replyTo; - - if (!content || !props.replyTo) { - return transport.channel.nack(message); - } + } - var data = utils.parseJSON(self.seneca, 'listen-' + options.type, content); - - utils.handle_request(self.seneca, data, options, function(out) { - if (typeof out === 'undefined' || out === null) { - return; - } - var outstr = utils.stringifyJSON(self.seneca, 'listen-' + options.type, out); - transport.channel.ack(message); - transport.channel.sendToQueue(replyTo, new Buffer(outstr)); - }); - }); - - self.seneca.add('role:seneca,cmd:close', function(closeArgs, cb) { - transport.channel.close(); - transport.conn.close(); - this.prior(closeArgs, cb); - }); - - self.seneca.log.info('listen', 'open', options, self.seneca); - return done(); + createActor(args, transport, done) { + var listener = new AMQPSenecaListener(this.seneca, transport, args); + return listener.listen() + .then(() => done()); + } }; - - AMQPListenHook.prototype.hook = function() { - return function(args, done) { - args = self.seneca.util.clean(_.extend({}, self.options[args.type], args)); - args.url = Amqpuri.format(args); - return self.start(args, function(err, transport) { - if (err) { - return done(err); - } - return self.makeRequestHandlers(args, transport, done); - }); - }; - }; - - return AMQPListenHook; -})(); - -module.exports = ListenHook; diff --git a/lib/listener.js b/lib/listener.js new file mode 100644 index 0000000..3d0474b --- /dev/null +++ b/lib/listener.js @@ -0,0 +1,40 @@ +'use strict'; +/** + * @module lib/listener + */ +module.exports = + class AMQPSenecaListener { + constructor(seneca, transport, options) { + this.seneca = seneca; + this.transport = transport; + this.options = options; + this.utils = seneca.export('transport/utils'); + } + + handleMessage(message, data) { + return this.utils.handle_request(this.seneca, data, this.options, (out) => { + if (!out) { + return; + } + var outstr = this.utils.stringifyJSON(this.seneca, `listen-${this.options.type}`, out); + this.transport.channel.sendToQueue(message.properties.replyTo, new Buffer(outstr)); + this.transport.channel.ack(message); + }); + } + + consume() { + return (message) => { + var content = message.content ? message.content.toString() : void 0; + var props = message.properties || {}; + if (!content || !props.replyTo) { + return this.transport.channel.nack(message); + } + var data = this.utils.parseJSON(this.seneca, `listen-${this.options.type}`, content); + return this.handleMessage(message, data); + }; + } + + listen() { + return this.transport.channel.consume(this.transport.queue, this.consume()); + } + }; diff --git a/package.json b/package.json index 439fdb8..38c54d9 100644 --- a/package.json +++ b/package.json @@ -25,7 +25,8 @@ "rabbitmq", "zmq", "activemq", - "microservice" + "microservice", + "rpc" ], "author": "George Haidar ", "contributors": [ @@ -33,33 +34,33 @@ ], "license": "MIT", "bugs": { - "url": "https://github.com/disintegrator/seneca-amqp-transport/issues" + "url": "https://github.com/seneca-contrib/seneca-amqp-transport/issues" }, - "homepage": "https://github.com/disintegrator/seneca-amqp-transport", + "homepage": "https://github.com/seneca-contrib/seneca-amqp-transport", "dependencies": { "amqplib": "^0.4.1", "amqpuri": "^1.0.2", - "async": "^2.0.0-rc.3", + "bluebird": "^3.4.0", "jsonic": "^0.2.2", - "lodash": "^4.11.1", - "seneca": "^2.0.1", + "lodash": "^4.13.1", + "seneca": "^2.1.0", "shortid": "^2.2.6" }, "devDependencies": { - "eslint": "^2.8.0", + "eslint": "^2.10.2", "eslint-config-seneca": "^2.0.0", "eslint-plugin-hapi": "^4.0.0", "eslint-plugin-standard": "^1.3.2", "gulp": "^3.9.1", "gulp-eslint": "^2.0.0", - "gulp-if": "^2.0.0", + "gulp-if": "^2.0.1", "gulp-istanbul": "^0.10.4", - "gulp-load-plugins": "^1.2.2", + "gulp-load-plugins": "^1.2.4", "gulp-mocha": "^2.2.0", "gulp-release": "^1.0.5", - "pre-commit": "^1.1.2" + "pre-commit": "^1.1.3" }, "pre-commit": [ "validate" ] -} \ No newline at end of file +}