Skip to content

Commit

Permalink
Update dependencies, fix breaking changes
Browse files Browse the repository at this point in the history
* promises from amqplib are no longer bluebird promises.  Need to update
  a few instances of `thenReturn` and `asCallback` to work w/ native
  promises.

* the `replyQueue` and `exchange` parameters are now objects instead of
  strings with `{queue}` and `{exchange}` referenced as the strings

* re-ran `npm run prettier` as prettier was updated reapplying formatting

Fixes #153
  • Loading branch information
Tyler Waters committed Dec 1, 2022
1 parent 649a62e commit f473da3
Show file tree
Hide file tree
Showing 34 changed files with 12,634 additions and 7,256 deletions.
52 changes: 26 additions & 26 deletions e2e/client.e2e.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,75 +13,75 @@ const QUEUE_NAME = 'seneca.add.cmd:test.role:client';
const EXCHANGE_NAME = 'seneca.topic';
const RK = 'cmd.test.role.client';

describe("A Seneca client with type:'amqp'", function() {
describe("A Seneca client with type:'amqp'", function () {
var ch = null;
var client = seneca
.use('..', {
exchange: {
name: EXCHANGE_NAME,
options: {
durable: false,
autoDelete: true
}
}
autoDelete: true,
},
},
})
.client({
url: process.env.AMQP_URL,
type: 'amqp',
pin: 'cmd:test,role:client'
pin: 'cmd:test,role:client',
});

before(function(done) {
seneca.ready(err => done(err));
before(function (done) {
seneca.ready((err) => done(err));
});

before(function() {
before(function () {
// Connect to the broker, (re-)declare exchange and queue used in test
// and remove any pre-existing messages from it
return amqp
.connect(process.env.AMQP_UR)
.then(conn => conn.createChannel())
.then(channel => {
.then((conn) => conn.createChannel())
.then((channel) => {
return channel
.deleteQueue(QUEUE_NAME)
.then(() =>
channel.assertQueue(QUEUE_NAME, {
durable: false,
exclusive: true
exclusive: true,
})
)
.then(() => channel.assertExchange(EXCHANGE_NAME, 'topic'))
.then(() => channel.bindQueue(QUEUE_NAME, EXCHANGE_NAME, RK))
.thenReturn(channel);
.then(() => channel);
})
.then(function(channel) {
.then(function (channel) {
ch = channel;
});
});

afterEach(function() {
afterEach(function () {
// Stop consuming from the test queue after each run
return ch.cancel(CONSUMER_TAG);
});

after(function() {
after(function () {
// Close both the channel and the connection to the AMQP broker
return ch.close().then(() => ch.connection.close());
});

after(function() {
after(function () {
seneca.close();
});

it('should publish a valid message to an AMQP queue after a call to `act()`', function(done) {
it('should publish a valid message to an AMQP queue after a call to `act()`', function (done) {
var payload = {
foo: 'bar',
life: 42
life: 42,
};

ch.consume(
QUEUE_NAME,
function(message) {
function (message) {
message.properties.should.be.an('object');
message.properties.should.have.property('correlationId').that.is.ok();
message.properties.should.have.property('replyTo').that.is.ok();
Expand All @@ -92,7 +92,7 @@ describe("A Seneca client with type:'amqp'", function() {
Object.assign(
{
cmd: 'test',
role: 'client'
role: 'client',
},
payload
)
Expand All @@ -105,19 +105,19 @@ describe("A Seneca client with type:'amqp'", function() {
client.act('cmd:test,role:client', payload);
});

it('should call the `act()` callback when replying', function(done) {
it('should call the `act()` callback when replying', function (done) {
var utils = seneca.export('transport/utils');
var payload = {
foo: 'bar',
life: 42
life: 42,
};
var respose = {
bar: 'baz'
bar: 'baz',
};

ch.consume(
QUEUE_NAME,
function(message) {
function (message) {
var data = JSON.parse(message.content.toString());
var reply = utils.prepareResponse(seneca, data);
reply.res = respose;
Expand All @@ -127,14 +127,14 @@ describe("A Seneca client with type:'amqp'", function() {
message.properties.replyTo,
Buffer.from(JSON.stringify(reply)),
{
correlationId: message.properties.correlationId
correlationId: message.properties.correlationId,
}
);
},
{ consumerTag: CONSUMER_TAG }
);

client.act('cmd:test,role:client', payload, function(err, res) {
client.act('cmd:test,role:client', payload, function (err, res) {
if (err) {
return done(err);
}
Expand Down
61 changes: 29 additions & 32 deletions e2e/listener.e2e.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,101 +15,98 @@ const RK = 'cmd.test.role.listener';
function deleteQueue(ch, queue) {
return ch
.deleteQueue(queue)
.thenReturn(ch)
.then(() => ch)
.catch(() => ch.connection.createChannel());
}

function deleteExchange(ch, queue) {
return ch
.deleteExchange(queue)
.thenReturn(ch)
.then(() => ch)
.catch(() => ch.connection.createChannel());
}

describe("A Seneca listener with type:'amqp'", function() {
describe("A Seneca listener with type:'amqp'", function () {
var ch = null;
var listener = seneca.use('..', {
exchange: {
name: EXCHANGE_NAME,
options: {
durable: false,
autoDelete: true
}
autoDelete: true,
},
},
listener: {
queues: {
options: {
durable: false,
exclusive: true
}
}
}
exclusive: true,
},
},
},
});

before(function() {
before(function () {
// Connect to the broker, delete queue and exchange, and then declare
// the actual listener. This is to properly test creation of needed AMQP
// elements during listener initialization.
return amqp
.connect(process.env.AMQP_URL)
.then(conn => conn.createChannel())
.then(channel => deleteQueue(channel, QUEUE_NAME))
.then(channel => deleteExchange(channel, EXCHANGE_NAME))
.then(function(channel) {
.then((conn) => conn.createChannel())
.then((channel) => deleteQueue(channel, QUEUE_NAME))
.then((channel) => deleteExchange(channel, EXCHANGE_NAME))
.then(function (channel) {
seneca.listen({
type: 'amqp',
url: process.env.AMQP_URL,
pin: 'cmd:test,role:listener'
pin: 'cmd:test,role:listener',
});
ch = channel;
});
});

before(function(done) {
seneca.ready(() => done());
before(function (done) {
seneca.ready(done);
});

after(function() {
after(function () {
// Close both the channel and the connection to the AMQP broker
// Declared queue and exchange should be automatically deleted on
// disconnection
return ch.close().then(() => ch.connection.close());
});

after(function() {
after(function () {
seneca.close();
});

it('should declare a new properly named queue in the broker', function(done) {
ch
.checkQueue(QUEUE_NAME)
.then(function(ok) {
ok.queue.should.eq(QUEUE_NAME);
})
.asCallback(done);
it('should declare a new properly named queue in the broker', function () {
return ch.checkQueue(QUEUE_NAME).then(function (ok) {
ok.queue.should.eq(QUEUE_NAME);
});
});

it('should declare an exchange in the broker', function(done) {
ch.checkExchange(EXCHANGE_NAME).asCallback(done);
it('should declare an exchange in the broker', function () {
return ch.checkExchange(EXCHANGE_NAME);
});

it('should call the `add()` callback when a new message is published', function(done) {
it('should call the `add()` callback when a new message is published', function (done) {
var message = {
kind: 'act',
time: { client_sent: Date.now() },
act: { cmd: 'test' },
sync: true
sync: true,
};

listener.add('cmd:test', function(payload, cb) {
listener.add('cmd:test', function (payload, cb) {
var received = seneca.util.clean(payload);
received.should.eql(message.act);
cb(null, { ok: true });
return done();
});

ch.publish(EXCHANGE_NAME, RK, Buffer.from(JSON.stringify(message)), {
replyTo: 'reply.queue'
replyTo: 'reply.queue',
});
});
});
16 changes: 7 additions & 9 deletions examples/client.js
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
#!/usr/bin/env node
'use strict';

const client = require('seneca')()
.use('..')
.client({
type: 'amqp',
pin: 'cmd:salute',
url: process.env.AMQP_URL
});
const client = require('seneca')().use('..').client({
type: 'amqp',
pin: 'cmd:salute',
url: process.env.AMQP_URL,
});

setInterval(function() {
setInterval(function () {
client.act(
'cmd:salute',
{
name: 'World',
max: 100,
min: 25
min: 25,
},
(err, res) => {
if (err) {
Expand Down
8 changes: 4 additions & 4 deletions examples/listener.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,21 @@ const path = require('path');

require('seneca')()
.use('..')
.add('cmd:salute', function(message, done) {
.add('cmd:salute', function (message, done) {
return done(null, {
id:
Math.floor(Math.random() * (message.max - message.min + 1)) +
message.min,
message: `Hello ${message.name}!`,
from: {
pid: process.pid,
file: path.relative(process.cwd(), __filename)
file: path.relative(process.cwd(), __filename),
},
now: Date.now()
now: Date.now(),
});
})
.listen({
type: 'amqp',
pin: 'cmd:salute',
url: process.env.AMQP_URL
url: process.env.AMQP_URL,
});
16 changes: 7 additions & 9 deletions examples/log-client.js
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
#!/usr/bin/env node
'use strict';

const client = require('seneca')()
.use('..')
.client({
type: 'amqp',
pin: 'cmd:log,level:log',
url: process.env.AMQP_URL
});
const client = require('seneca')().use('..').client({
type: 'amqp',
pin: 'cmd:log,level:log',
url: process.env.AMQP_URL,
});

setInterval(function() {
setInterval(function () {
client.act(
'cmd:log,level:log',
{
message: 'Hello World'
message: 'Hello World',
},
(err, res) => {
if (err) {
Expand Down
4 changes: 2 additions & 2 deletions examples/log-listener.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@

require('seneca')()
.use('..')
.add('cmd:log,level:*', function(req, done) {
.add('cmd:log,level:*', function (req, done) {
console[req.level](req.message);
return done(null, { ok: true, when: Date.now() });
})
.listen({
type: 'amqp',
pin: 'cmd:log,level:*',
url: process.env.AMQP_URL
url: process.env.AMQP_URL,
});
Loading

0 comments on commit f473da3

Please sign in to comment.