Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update dependencies, fix breaking changes #154

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .prettierrc
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
{
"singleQuote": true
"singleQuote": true,
"trailingComma": "none",
"arrowParens": "avoid"
}
26 changes: 13 additions & 13 deletions e2e/client.e2e.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const QUEUE_NAME = 'seneca.add.cmd:test.role:client';
const EXCHANGE_NAME = 'seneca.topic';
const RK = 'cmd.test.role.client';

describe("A Seneca client with type:'amqp'", function() {
describe("A Seneca client with type:'amqp'", function () {
var ch = null;
var client = seneca
.use('..', {
Expand All @@ -31,11 +31,11 @@ describe("A Seneca client with type:'amqp'", function() {
pin: 'cmd:test,role:client'
});

before(function(done) {
before(function (done) {
seneca.ready(err => done(err));
});

before(function() {
before(function () {
// Connect to the broker, (re-)declare exchange and queue used in test
// and remove any pre-existing messages from it
return amqp
Expand All @@ -52,36 +52,36 @@ describe("A Seneca client with type:'amqp'", function() {
)
.then(() => channel.assertExchange(EXCHANGE_NAME, 'topic'))
.then(() => channel.bindQueue(QUEUE_NAME, EXCHANGE_NAME, RK))
.thenReturn(channel);
.then(() => channel);
})
.then(function(channel) {
.then(function (channel) {
ch = channel;
});
});

afterEach(function() {
afterEach(function () {
// Stop consuming from the test queue after each run
return ch.cancel(CONSUMER_TAG);
});

after(function() {
after(function () {
// Close both the channel and the connection to the AMQP broker
return ch.close().then(() => ch.connection.close());
});

after(function() {
after(function () {
seneca.close();
});

it('should publish a valid message to an AMQP queue after a call to `act()`', function(done) {
it('should publish a valid message to an AMQP queue after a call to `act()`', function (done) {
var payload = {
foo: 'bar',
life: 42
};

ch.consume(
QUEUE_NAME,
function(message) {
function (message) {
message.properties.should.be.an('object');
message.properties.should.have.property('correlationId').that.is.ok();
message.properties.should.have.property('replyTo').that.is.ok();
Expand All @@ -105,7 +105,7 @@ describe("A Seneca client with type:'amqp'", function() {
client.act('cmd:test,role:client', payload);
});

it('should call the `act()` callback when replying', function(done) {
it('should call the `act()` callback when replying', function (done) {
var utils = seneca.export('transport/utils');
var payload = {
foo: 'bar',
Expand All @@ -117,7 +117,7 @@ describe("A Seneca client with type:'amqp'", function() {

ch.consume(
QUEUE_NAME,
function(message) {
function (message) {
var data = JSON.parse(message.content.toString());
var reply = utils.prepareResponse(seneca, data);
reply.res = respose;
Expand All @@ -134,7 +134,7 @@ describe("A Seneca client with type:'amqp'", function() {
{ consumerTag: CONSUMER_TAG }
);

client.act('cmd:test,role:client', payload, function(err, res) {
client.act('cmd:test,role:client', payload, function (err, res) {
if (err) {
return done(err);
}
Expand Down
37 changes: 17 additions & 20 deletions e2e/listener.e2e.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@ const RK = 'cmd.test.role.listener';
function deleteQueue(ch, queue) {
return ch
.deleteQueue(queue)
.thenReturn(ch)
.then(() => ch)
.catch(() => ch.connection.createChannel());
}

function deleteExchange(ch, queue) {
return ch
.deleteExchange(queue)
.thenReturn(ch)
.then(() => ch)
.catch(() => ch.connection.createChannel());
}

describe("A Seneca listener with type:'amqp'", function() {
describe("A Seneca listener with type:'amqp'", function () {
var ch = null;
var listener = seneca.use('..', {
exchange: {
Expand All @@ -46,7 +46,7 @@ describe("A Seneca listener with type:'amqp'", function() {
}
});

before(function() {
before(function () {
// Connect to the broker, delete queue and exchange, and then declare
// the actual listener. This is to properly test creation of needed AMQP
// elements during listener initialization.
Expand All @@ -55,7 +55,7 @@ describe("A Seneca listener with type:'amqp'", function() {
.then(conn => conn.createChannel())
.then(channel => deleteQueue(channel, QUEUE_NAME))
.then(channel => deleteExchange(channel, EXCHANGE_NAME))
.then(function(channel) {
.then(function (channel) {
seneca.listen({
type: 'amqp',
url: process.env.AMQP_URL,
Expand All @@ -65,43 +65,40 @@ describe("A Seneca listener with type:'amqp'", function() {
});
});

before(function(done) {
seneca.ready(() => done());
before(function (done) {
seneca.ready(done);
});

after(function() {
after(function () {
// Close both the channel and the connection to the AMQP broker
// Declared queue and exchange should be automatically deleted on
// disconnection
return ch.close().then(() => ch.connection.close());
});

after(function() {
after(function () {
seneca.close();
});

it('should declare a new properly named queue in the broker', function(done) {
ch
.checkQueue(QUEUE_NAME)
.then(function(ok) {
ok.queue.should.eq(QUEUE_NAME);
})
.asCallback(done);
it('should declare a new properly named queue in the broker', function () {
return ch.checkQueue(QUEUE_NAME).then(function (ok) {
ok.queue.should.eq(QUEUE_NAME);
});
});

it('should declare an exchange in the broker', function(done) {
ch.checkExchange(EXCHANGE_NAME).asCallback(done);
it('should declare an exchange in the broker', function () {
return ch.checkExchange(EXCHANGE_NAME);
});

it('should call the `add()` callback when a new message is published', function(done) {
it('should call the `add()` callback when a new message is published', function (done) {
var message = {
kind: 'act',
time: { client_sent: Date.now() },
act: { cmd: 'test' },
sync: true
};

listener.add('cmd:test', function(payload, cb) {
listener.add('cmd:test', function (payload, cb) {
var received = seneca.util.clean(payload);
received.should.eql(message.act);
cb(null, { ok: true });
Expand Down
14 changes: 6 additions & 8 deletions examples/client.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
#!/usr/bin/env node
'use strict';

const client = require('seneca')()
.use('..')
.client({
type: 'amqp',
pin: 'cmd:salute',
url: process.env.AMQP_URL
});
const client = require('seneca')().use('..').client({
type: 'amqp',
pin: 'cmd:salute',
url: process.env.AMQP_URL
});

setInterval(function() {
setInterval(function () {
client.act(
'cmd:salute',
{
Expand Down
2 changes: 1 addition & 1 deletion examples/listener.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const path = require('path');

require('seneca')()
.use('..')
.add('cmd:salute', function(message, done) {
.add('cmd:salute', function (message, done) {
return done(null, {
id:
Math.floor(Math.random() * (message.max - message.min + 1)) +
Expand Down
14 changes: 6 additions & 8 deletions examples/log-client.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
#!/usr/bin/env node
'use strict';

const client = require('seneca')()
.use('..')
.client({
type: 'amqp',
pin: 'cmd:log,level:log',
url: process.env.AMQP_URL
});
const client = require('seneca')().use('..').client({
type: 'amqp',
pin: 'cmd:log,level:log',
url: process.env.AMQP_URL
});

setInterval(function() {
setInterval(function () {
client.act(
'cmd:log,level:log',
{
Expand Down
2 changes: 1 addition & 1 deletion examples/log-listener.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

require('seneca')()
.use('..')
.add('cmd:log,level:*', function(req, done) {
.add('cmd:log,level:*', function (req, done) {
console[req.level](req.message);
return done(null, { ok: true, when: Date.now() });
})
Expand Down
2 changes: 1 addition & 1 deletion examples/multipin-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const client = require('seneca')()
url: process.env.AMQP_URL
});

setInterval(function() {
setInterval(function () {
client.act(
'action:get_time',
{
Expand Down
6 changes: 3 additions & 3 deletions examples/multipin-listener.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@

require('seneca')()
.use('..')
.add('action:get_time', function(message, done) {
.add('action:get_time', function (message, done) {
console.log(`[action:get_time] Action ${message.id} received`);
return done(null, {
pid: process.pid,
time: 'Current time is ' + Date.now() + 'ms'
});
})
.add('level:log', function(message, done) {
.add('level:log', function (message, done) {
console[message.level](
`[level:log] Action ${message.id} wants to log: ${message.text}`
);
Expand All @@ -19,7 +19,7 @@ require('seneca')()
status: `Message ${message.id} logged successfully`
});
})
.add('proc:status', function(message, done) {
.add('proc:status', function (message, done) {
console.log(`[action:status] Action ${message.id} received`);
return done(null, {
pid: process.pid,
Expand Down
2 changes: 1 addition & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const PLUGIN_NAME = 'amqp-transport';
const PLUGIN_TAG = require('./package.json').version;
const TRANSPORT_TYPE = 'amqp';

module.exports = function(opts) {
module.exports = function (opts) {
var seneca = this;
var so = seneca.options();
var options = seneca.util.deepextend(defaults, so.transport, opts);
Expand Down
13 changes: 8 additions & 5 deletions lib/client/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,12 @@ function declareRoute(ch, options) {
const queueName = amqputil.resolveClientQueue(qclient);
ch.prefetch(options.client.channel.prefetch);
return Promise.props({
exchange: ch.assertExchange(ex.name, ex.type, ex.options).get('exchange'),
queue: ch.assertQueue(queueName, qclient.options).get('queue')
});
exchange: ch.assertExchange(ex.name, ex.type, ex.options),
queue: ch.assertQueue(queueName, qclient.options)
}).then(({ exchange, queue }) => ({
exchange: exchange.exchange,
queue: queue.queue
}));
}

/**
Expand All @@ -48,7 +51,7 @@ function declareRoute(ch, options) {
*/
function createActor(seneca, { ch, queue, exchange, options }, done) {
const client = Client(seneca, { ch, queue, exchange, options });
return Promise.resolve(client.start(done)).thenReturn(client);
return Promise.resolve(client.start(done)).then(() => client);
}

/**
Expand All @@ -63,7 +66,7 @@ function createActor(seneca, { ch, queue, exchange, options }, done) {
* declared.
*/
function setup(seneca, { ch, options }, done) {
return declareRoute(ch, options).then(function({ queue, exchange }) {
return declareRoute(ch, options).then(function ({ queue, exchange }) {
return createActor(seneca, { ch, queue, exchange, options }, done);
});
}
2 changes: 1 addition & 1 deletion lib/common/dead-letter.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ function declareDeadLetterExchange(channel, opt) {
function bindDeadLetterQueue(dlq, dlx, channel) {
return channel
.bindQueue(dlq.queue, dlx.exchange, ROUTING_KEY)
.thenReturn({ rk: ROUTING_KEY });
.then(() => ({ rk: ROUTING_KEY }));
}

/**
Expand Down
5 changes: 4 additions & 1 deletion lib/common/hooker.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ function closer(ch, done) {
return ch
.close()
.then(() => ch.connection.close())
.asCallback(done);
.then(
res => done(null, res),
err => done(err)
);
}

/**
Expand Down
4 changes: 2 additions & 2 deletions lib/listener/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ function declareRoute(seneca, { ch, options }) {
ch.bindQueue(q.queue, exchange.exchange, topic)
)
)
.thenReturn({ exchange: exchange.exchange, queue });
.then(() => ({ exchange: exchange.exchange, queue }));
});
}

Expand All @@ -74,7 +74,7 @@ function createActor(seneca, { ch, queue, options }, done) {
return listener
.listen()
.then(() => done())
.thenReturn(listener);
.then(() => listener);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion lib/listener/listener-factory.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ function createListener(seneca, { ch, queue, options = {} }) {

function handleMessage(message, replyWith) {
const data = utils.parseJSON(seneca, `listen-${options.type}`, message);
return utils.handle_request(seneca, data, options, function(out) {
return utils.handle_request(seneca, data, options, function (out) {
// Here, `out` represents the reply from the Seneca add function
// Should be sent as response to the `replyTo` callback queue
if (!out) {
Expand Down
Loading