Skip to content

Commit

Permalink
Merge pull request #30 from seneca-contrib/feature/es6-classes
Browse files Browse the repository at this point in the history
ES6/2015 classes
  • Loading branch information
nfantone committed May 24, 2016
2 parents 3f9c413 + 296b9c8 commit 1c6bb66
Show file tree
Hide file tree
Showing 8 changed files with 221 additions and 229 deletions.
13 changes: 7 additions & 6 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,24 @@ const ClientHook = require('./lib/client-hook');
const ListenHook = require('./lib/listen-hook');

const PLUGIN_NAME = 'amqp-transport';
const TRANSPORT_TYPE = 'amqp';

module.exports = function(opts) {
var seneca = this;
var so = seneca.options();
var options = seneca.util.deepextend(Defaults, so.transport, opts);
var listen = new ListenHook(seneca, options);
var client = new ClientHook(seneca, options);
var listen = new ListenHook(seneca);
var client = new ClientHook(seneca);
seneca.add({
role: 'transport',
hook: 'listen',
type: 'amqp'
}, listen.hook());
type: TRANSPORT_TYPE
}, listen.hook(options));
seneca.add({
role: 'transport',
hook: 'client',
type: 'amqp'
}, client.hook());
type: TRANSPORT_TYPE
}, client.hook(options));

return {
name: PLUGIN_NAME
Expand Down
4 changes: 2 additions & 2 deletions lib/amqp-util.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
'use strict';
/**
* @module amqp-util
*
* Small utility module that groups functions
* to resolve queues and topic names on listeners
* and clients.
*
* @module lib/amqp-util
*/
const _ = require('lodash');
const Shortid = require('shortid');
Expand Down
124 changes: 31 additions & 93 deletions lib/client-hook.js
Original file line number Diff line number Diff line change
@@ -1,103 +1,41 @@
'use strict';
/**
* @module client-hook
* @module lib/client-hook
*/
const _ = require('lodash');
const Async = require('async');
const Amqp = require('amqplib');
const Promise = require('bluebird');
const Amqputil = require('./amqp-util');
const Amqpuri = require('amqpuri');
const Amqp = require('amqplib/callback_api');

var ClientHook = (function() {
var utils;
var self;

function AMQPClientHook(seneca, options) {
this.seneca = seneca;
this.options = options;
utils = seneca.export('transport/utils');
self = this;
}

AMQPClientHook.prototype.start = function(options, done) {
return Async.auto({
conn: function(cb) {
return Amqp.connect(options.url, options.socketOptions, cb);
},
channel: ['conn', function(results, cb) {
var conn = results.conn;
conn.createChannel(function(err, channel) {
if (err) {
return cb(err);
}
const SenecaHook = require('./hook');
const AMQPSenecaClient = require('./client');

module.exports =
class AMQPClientHook extends SenecaHook {
createTransport(args) {
return Amqp.connect(args.url, args.socketOptions)
.then((conn) => conn.createChannel())
.then((channel) => {
var ex = args.exchange;
var qres = args.queues.response;
var queueName = Amqputil.resolveClientQueue(qres);
channel.prefetch(1);
channel.on('error', done);
return cb(null, channel);
return Promise.props({
channel,
exchange: channel.assertExchange(ex.name, ex.type, ex.options),
queue: channel.assertQueue(queueName, qres.options)
}).then((transport) => {
return {
channel: transport.channel,
exchange: transport.exchange.exchange,
queue: transport.queue.queue
};
});
});
}],
exchange: ['channel', function(results, cb) {
var channel = results.channel;
var ex = options.exchange;
return channel.assertExchange(ex.name, ex.type, ex.options, function(err, ok) {
return cb(err, ok.exchange);
});
}],
resQueue: ['channel', function(results, cb) {
var channel = results.channel;
var qres = options.queues.response;
var queueName = Amqputil.resolveClientQueue(qres);
return channel.assertQueue(queueName, qres.options, function(err, ok) {
return cb(err, ok.queue);
});
}]
}, done);
};
}

AMQPClientHook.prototype.makeClient = function(options, transport, done) {
return utils.make_client(this.seneca, function(spec, topic, sendDone) {
createActor(args, transport, done) {
transport.channel.on('error', done);
var client = new AMQPSenecaClient(this.seneca, transport, args);
return this.utils.make_client(this.seneca, client.callback(), args, done);
}

self.seneca.log.debug('client', 'subscribe', transport.resQueue, options, self.seneca);
transport.channel.consume(transport.resQueue, function(message) {
var content = message.content ? message.content.toString() : undefined;
var input = utils.parseJSON(self.seneca, 'client-' + options.type, content);
utils.handle_response(self.seneca, input, options);
}, {
noAck: true
});

sendDone(null, function(args, done) {
var outmsg = utils.prepare_request(this, args, done);
var outstr = utils.stringifyJSON(self.seneca, 'client-' + options.type, outmsg);
var opts = {
replyTo: transport.resQueue
};
topic = Amqputil.resolveClientTopic(args);
transport.channel.publish(transport.exchange, topic, new Buffer(outstr), opts);
});

self.seneca.add('role:seneca,cmd:close', function(closeArgs, done) {
transport.channel.close();
transport.conn.close();
this.prior(closeArgs, done);
});
}, options, done);
};

AMQPClientHook.prototype.hook = function() {
return function(args, done) {
args = self.seneca.util.clean(_.extend({}, self.options[args.type], args));
args.url = Amqpuri.format(args);
return self.start(args, function(err, transport) {
if (err) {
return done(err);
}
return self.makeClient(args, transport, done);
});
};
};

return AMQPClientHook;
})();

module.exports = ClientHook;
47 changes: 47 additions & 0 deletions lib/client.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
'use strict';
/**
* @module lib/client
*/
const Amqputil = require('./amqp-util');

module.exports =
class AMQPSenecaClient {
constructor(seneca, transport, options) {
this.seneca = seneca;
this.transport = transport;
this.options = options;
this.utils = seneca.export('transport/utils');
}

callback() {
return (spec, topic, sendDone) => this.awaitReply()
.done(() => sendDone(null, this.publish()), sendDone);
}

publish() {
return (args, done) => {
var outmsg = this.utils.prepare_request(this.seneca, args, done);
var outstr = this.utils.stringifyJSON(this.seneca, 'client-' + this.options.type, outmsg);
var opts = {
replyTo: this.transport.queue
};
var topic = Amqputil.resolveClientTopic(args);
return this.transport.channel.publish(this.transport.exchange, topic, new Buffer(outstr), opts);
};
}

consumeReply() {
return (message) => {
var content = message.content ? message.content.toString() : undefined;
var input = this.utils.parseJSON(this.seneca, 'client-' + this.options.type, content);
this.utils.handle_response(this.seneca, input, this.options);
};
}

awaitReply() {
return this.transport.channel.consume(this.transport.queue,
this.consumeReply(), {
noAck: true
});
}
};
43 changes: 43 additions & 0 deletions lib/hook.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
'use strict';
/**
* @module lib/hook
*/
const Amqpuri = require('amqpuri');

module.exports =
class SenecaHook {
constructor(seneca) {
this.seneca = seneca;
this.utils = seneca.export('transport/utils');
}

addCloseCmd(transport) {
var seneca = this.seneca;
return this.seneca.add('role:seneca,cmd:close', function(args, done) {
try {
transport.channel.close();
transport.channel.connection.close();
} catch (e) {
// Channel already closed or closing
seneca.log.warn('Channel failed to close', e);
} finally {
this.prior(args, done);
}
});
}

hook(options) {
return (args, done) => {
args = this.seneca.util.clean(this.seneca.util.deepextend(options[args.type], args));
args.url = Amqpuri.format(args);
return this.createTransport(args)
.then((transport) => {
transport.channel.on('error', done);
return Promise.all([
this.addCloseCmd(transport),
this.createActor(args, transport, done)
]);
}).catch(done);
};
}
};
Loading

0 comments on commit 1c6bb66

Please sign in to comment.