Skip to content

Commit

Permalink
Merge branch 'release/1.1.3'
Browse files Browse the repository at this point in the history
  • Loading branch information
getvega committed Apr 5, 2017
2 parents b24d6db + c69fc7c commit 0d380ef
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 70 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ It is part of a lightweight microservice framework that we are cooking here at C

## Sample usage

Only basic usage is covered here, you can browse the [https://github.com/cubyn/node-carotte-amqp/tree/master/examples](examples folder) on the repo for more use-cases.
Only basic usage is covered here, you can browse the [examples folder](https://github.com/cubyn/node-carotte-amqp/tree/master/examples) on the repo for more use-cases.

### Module configuration

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "carotte-amqp",
"version": "1.1.2",
"version": "1.1.3",
"description": "AMQP wrapper for the carotte microservice framework",
"main": "src/index.js",
"repository": "https://github.com/cubyn/node-carotte-amqp",
Expand Down
143 changes: 77 additions & 66 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ function Carotte(config) {
enableDeadLetter: true,
autoDescribe: false,
retryOnError() {
return process.env.NODE_ENV === 'production';
return true;
},
transport: emptyTransport
}, config);
Expand Down Expand Up @@ -80,13 +80,13 @@ function Carotte(config) {

connexion = amqp.connect(`amqp://${config.host}`, config.connexion).then(conn => {
conn.on('close', error => {
config.transport.error({ error });
config.transport.error('amqp.connection.closed', { error });
connexion = null;
channels = {};
carotte.cleanExchangeCache();
});
conn.once('error', error => {
config.transport.error({ error });
config.transport.error('amqp.connection.error', { error });
connexion = null;
channels = {};
});
Expand Down Expand Up @@ -114,6 +114,7 @@ function Carotte(config) {
* @return {promise} return the channel created
*/
carotte.getChannel = function getChannel(name = '', prefetch = 0) {
prefetch = Number(prefetch);
const channelKey = (prefetch > 0) ? `${name}:${prefetch}` : 0;

if (channels[channelKey]) {
Expand All @@ -126,13 +127,19 @@ function Carotte(config) {
.then(chan => {
initDebug('channel created correctly');
chan.on('close', error => {
config.transport.error({ error });
config.transport.error('amqp.channel.closed', {
channel: channelKey,
error
});
channels[channelKey] = null;
carotte.cleanExchangeCache();
});
// this allow chan to throw on errors
chan.once('error', error => {
config.transport.error({ error });
config.transport.error('amqp.channel.error', {
channel: channelKey,
error
});
channels[channelKey] = null;
});

Expand Down Expand Up @@ -230,6 +237,8 @@ function Carotte(config) {
options = Object.assign({ headers: {}, context: {} }, options, parseQualifier(qualifier));

const exchangeName = getExchangeName(options);
const rpc = options.headers['x-reply-to'] !== undefined;
const { log = true } = options;

// isContentBuffer is used by internal functions who don't modify the content
const buffer = options.isContentBuffer
Expand All @@ -254,15 +263,16 @@ function Carotte(config) {

return ok.then(() => {
producerDebug(`publishing to ${options.routingKey} on ${exchangeName}`);
config.transport.log({
context: options.context,
headers: options.headers,
data: payload,
dataLength: buffer.length,
subscriber: options.context['origin-consumer'] || '',
destination: qualifier,
rpc: options.headers['x-reply-to'] !== undefined
});
if (log) {
config.transport.info(`${rpc ? '>> ' : '> '} ${options.type}/${options.routingKey}`, {
context: options.context,
headers: options.headers,
data: buffer.toString(),
dataLength: buffer.length,
subscriber: options.context['origin-consumer'] || '',
destination: qualifier
});
}
return chan.publish(
exchangeName,
options.routingKey,
Expand All @@ -277,15 +287,14 @@ function Carotte(config) {
});
})
.catch(err => {
config.transport.error({
config.transport.error(`${rpc ? '>> ' : '> '} ${options.type}/${options.routingKey}`, {
context: options.context,
headers: options.headers,
data: payload,
data: buffer.toString(),
dataLength: buffer.length,
error: err,
subscriber: options.context['origin-consumer'] || '',
destination: qualifier,
rpc: options.headers['x-reply-to'] !== undefined
error: err
});

if (err.message.match(errorToRetryRegex)) {
Expand Down Expand Up @@ -446,25 +455,16 @@ function Carotte(config) {
consumerDebug(`message handled on ${exchangeName} by queue ${q.queue}`);
const { headers } = message.properties;

const content = JSON.parse(message.content.toString());
const messageStr = message.content.toString();
const content = JSON.parse(messageStr);

const { data, context } = content;
const startTime = new Date().getTime();
const rpc = headers['x-reply-to'] !== undefined;

headers['x-origin-consumer'] = qualifier;
context['origin-consumer'] = qualifier;

config.transport.log({
deliveryTag: message.fields.deliveryTag,
context,
headers,
data,
dataLength: message.content.length,
subscriber: qualifier,
destination: '',
rpc: headers['x-reply-to'] !== undefined
});

// execute the handler inside a try catch block
return execInPromise(handler,
{
Expand All @@ -483,12 +483,19 @@ function Carotte(config) {
})
.then(() => {
consumerDebug('Handler success');

config.transport.info({
deliveryTag: message.fields.deliveryTag,
context,
executionMs: new Date().getTime() - startTime
});
// otherwise internal subscribe (rpc…)
if (qualifier) {
config.transport.info(`${rpc ? '<< ' : '< '} ${qualifier}`, {
context,
headers,
data: messageStr,
dataLength: message.content.length,
subscriber: qualifier,
destination: '',
executionMs: new Date().getTime() - startTime,
deliveryTag: message.fields.deliveryTag
});
}

return chan.ack(message);
})
Expand All @@ -513,65 +520,68 @@ function Carotte(config) {
return err => {
return carotte.getChannel(qualifier, options.prefetch)
.then(chan => {
let retry = meta.retry || { max: 5, strategy: 'exponential', interval: 1 };
const retry = meta.retry || { max: 5, strategy: 'exponential', interval: 1 };

const currentRetry = (Number(headers['x-retry-count']) || 0) + 1;
const pubOptions = messageToOptions(qualifier, message);
const rpc = headers['x-reply-to'] !== undefined;

// if custom error thrown, we want to forward it to producer
if (err.status) retry = false;

config.transport.error({
config.transport.error(`${rpc ? '<< ' : '< '} ${qualifier}`, {
context,
headers,
error: err,
subscriber: qualifier,
destination: '',
rpc: headers['x-reply-to'] !== undefined
error: err
});

if (retry && retry.max > 0 && currentRetry <= retry.max) {
// if custom error thrown, we want to forward it to producer
// and avoid storing it in any dead-letter queue so we return here
if (err.status) {
return carotte.replyToPublisher(message, err, context, true)
.then(chan.ack(message));
}

if (retry.max > 0 && currentRetry <= retry.max) {
consumerDebug(`Handler error: trying again with strategy ${retry.strategy}`);
const rePublishOptions = incrementRetryHeaders(pubOptions, retry);
const nextCallDelay = computeNextCall(pubOptions);

setTimeout(() => {
return setTimeout(() => {
carotte.publish(qualifier, rePublishOptions, message.content)
.then(() => chan.ack(message))
.catch(() => chan.nack(message));
}, nextCallDelay);
} else {
if (retry && currentRetry > retry.max) {
err.status = 500;
}
consumerDebug(`Handler error: ${err.message}`);
delete pubOptions.exchange;
}

// publish the message to the dead-letter queue
carotte.saveDeadLetterIfNeeded(pubOptions, message)
.then(() => {
message.properties.headers = cleanRetryHeaders(
message.properties.headers
);
return carotte.replyToPublisher(message, err, context, true);
})
.then(() => chan.ack(message))
.catch(() => chan.nack(message));
if (retry && currentRetry > retry.max) {
err.status = 500;
}
consumerDebug(`Handler error: ${err.message}`);

// publish the message to the dead-letter queue
// remove exchange options because we manage this queue ourselves
return carotte.saveDeadLetterIfNeeded(message)
.then(() => {
message.properties.headers = cleanRetryHeaders(
message.properties.headers
);
return carotte.replyToPublisher(message, err, context, true);
})
.then(() => chan.ack(message))
.catch(() => chan.nack(message));
});
};
};

/**
* Publish the message to the dead letter queue according to the config
* @param {object} options - options to publish
* @param {object} content - content for dead letter
* @param {object} message - amqplib message
* @return {promise}
*/
carotte.saveDeadLetterIfNeeded = function saveDeadLetterIfNeeded(options, message) {
carotte.saveDeadLetterIfNeeded = function saveDeadLetterIfNeeded(message) {
if (config.enableDeadLetter) {
return carotte.publish(config.deadLetterQualifier,
{ headers: message.properties.headers },
{ headers: message.properties.headers, isContentBuffer: true },
message.content);
}
return Promise.resolve();
Expand Down Expand Up @@ -602,7 +612,8 @@ function Carotte(config) {

return carotte.publish(`direct/${headers['x-reply-to']}`, {
headers: newHeaders,
context
context,
log: false
}, payload);
}
return Promise.resolve();
Expand Down
26 changes: 24 additions & 2 deletions tests/subscriber.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ describe('subscriber', () => {
});

describe('retry', () => {
let callCount = 0;
it('should retry when retry is specified', done => {
carotte.subscribe('bye', () => {
let callCount = 0;
carotte.subscribe('bye', { exclusive: true }, () => {
callCount++;
if (callCount === 4) {
setTimeout(done, 500);
Expand All @@ -90,5 +90,27 @@ describe('subscriber', () => {
}, { retry: { max: 3, interval: 0, strategy: 'direct' } })
.then(() => carotte.publish('bye', {}));
});

it('should not retry when retry is specified but error with status is thrown', () => {
let callCount = 0;
return carotte.subscribe('bye2', { exclusive: true }, () => {
callCount++;
if (callCount === 1) {
throw new Error('Should not be called a second time');
} else {
const err = new Error();
err.status = 400;
throw err;
}
}, { retry: { max: 3, interval: 0, strategy: 'direct' } })
.then(() => carotte.invoke('bye2', {}))
.then(() => {
throw new Error('Should not succeed');
})
.catch((err) => {
expect(err.status).to.be.eql(400);
expect(err.message).to.not.be.eql('Should not be called a second time');
});
});
});
});

0 comments on commit 0d380ef

Please sign in to comment.