diff --git a/README.md b/README.md index e6b4a8c..6b1954b 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,7 @@ It is part of a lightweight microservice framework that we are cooking here at C ## Sample usage -Only basic usage is covered here, you can browse the [https://github.com/cubyn/node-carotte-amqp/tree/master/examples](examples folder) on the repo for more use-cases. +Only basic usage is covered here, you can browse the [examples folder](https://github.com/cubyn/node-carotte-amqp/tree/master/examples) on the repo for more use-cases. ### Module configuration diff --git a/package.json b/package.json index 01adb1f..25e515c 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "carotte-amqp", - "version": "1.1.2", + "version": "1.1.3", "description": "AMQP wrapper for the carotte microservice framework", "main": "src/index.js", "repository": "https://github.com/cubyn/node-carotte-amqp", diff --git a/src/index.js b/src/index.js index 8ebd6fd..73ea8b0 100644 --- a/src/index.js +++ b/src/index.js @@ -48,7 +48,7 @@ function Carotte(config) { enableDeadLetter: true, autoDescribe: false, retryOnError() { - return process.env.NODE_ENV === 'production'; + return true; }, transport: emptyTransport }, config); @@ -80,13 +80,13 @@ function Carotte(config) { connexion = amqp.connect(`amqp://${config.host}`, config.connexion).then(conn => { conn.on('close', error => { - config.transport.error({ error }); + config.transport.error('amqp.connection.closed', { error }); connexion = null; channels = {}; carotte.cleanExchangeCache(); }); conn.once('error', error => { - config.transport.error({ error }); + config.transport.error('amqp.connection.error', { error }); connexion = null; channels = {}; }); @@ -114,6 +114,7 @@ function Carotte(config) { * @return {promise} return the channel created */ carotte.getChannel = function getChannel(name = '', prefetch = 0) { + prefetch = Number(prefetch); const channelKey = (prefetch > 0) ? `${name}:${prefetch}` : 0; if (channels[channelKey]) { @@ -126,13 +127,19 @@ function Carotte(config) { .then(chan => { initDebug('channel created correctly'); chan.on('close', error => { - config.transport.error({ error }); + config.transport.error('amqp.channel.closed', { + channel: channelKey, + error + }); channels[channelKey] = null; carotte.cleanExchangeCache(); }); // this allow chan to throw on errors chan.once('error', error => { - config.transport.error({ error }); + config.transport.error('amqp.channel.error', { + channel: channelKey, + error + }); channels[channelKey] = null; }); @@ -230,6 +237,8 @@ function Carotte(config) { options = Object.assign({ headers: {}, context: {} }, options, parseQualifier(qualifier)); const exchangeName = getExchangeName(options); + const rpc = options.headers['x-reply-to'] !== undefined; + const { log = true } = options; // isContentBuffer is used by internal functions who don't modify the content const buffer = options.isContentBuffer @@ -254,15 +263,16 @@ function Carotte(config) { return ok.then(() => { producerDebug(`publishing to ${options.routingKey} on ${exchangeName}`); - config.transport.log({ - context: options.context, - headers: options.headers, - data: payload, - dataLength: buffer.length, - subscriber: options.context['origin-consumer'] || '', - destination: qualifier, - rpc: options.headers['x-reply-to'] !== undefined - }); + if (log) { + config.transport.info(`${rpc ? '>> ' : '> '} ${options.type}/${options.routingKey}`, { + context: options.context, + headers: options.headers, + data: buffer.toString(), + dataLength: buffer.length, + subscriber: options.context['origin-consumer'] || '', + destination: qualifier + }); + } return chan.publish( exchangeName, options.routingKey, @@ -277,15 +287,14 @@ function Carotte(config) { }); }) .catch(err => { - config.transport.error({ + config.transport.error(`${rpc ? '>> ' : '> '} ${options.type}/${options.routingKey}`, { context: options.context, headers: options.headers, - data: payload, + data: buffer.toString(), dataLength: buffer.length, - error: err, subscriber: options.context['origin-consumer'] || '', destination: qualifier, - rpc: options.headers['x-reply-to'] !== undefined + error: err }); if (err.message.match(errorToRetryRegex)) { @@ -446,25 +455,16 @@ function Carotte(config) { consumerDebug(`message handled on ${exchangeName} by queue ${q.queue}`); const { headers } = message.properties; - const content = JSON.parse(message.content.toString()); + const messageStr = message.content.toString(); + const content = JSON.parse(messageStr); const { data, context } = content; const startTime = new Date().getTime(); + const rpc = headers['x-reply-to'] !== undefined; headers['x-origin-consumer'] = qualifier; context['origin-consumer'] = qualifier; - config.transport.log({ - deliveryTag: message.fields.deliveryTag, - context, - headers, - data, - dataLength: message.content.length, - subscriber: qualifier, - destination: '', - rpc: headers['x-reply-to'] !== undefined - }); - // execute the handler inside a try catch block return execInPromise(handler, { @@ -483,12 +483,19 @@ function Carotte(config) { }) .then(() => { consumerDebug('Handler success'); - - config.transport.info({ - deliveryTag: message.fields.deliveryTag, - context, - executionMs: new Date().getTime() - startTime - }); + // otherwise internal subscribe (rpc…) + if (qualifier) { + config.transport.info(`${rpc ? '<< ' : '< '} ${qualifier}`, { + context, + headers, + data: messageStr, + dataLength: message.content.length, + subscriber: qualifier, + destination: '', + executionMs: new Date().getTime() - startTime, + deliveryTag: message.fields.deliveryTag + }); + } return chan.ack(message); }) @@ -513,65 +520,68 @@ function Carotte(config) { return err => { return carotte.getChannel(qualifier, options.prefetch) .then(chan => { - let retry = meta.retry || { max: 5, strategy: 'exponential', interval: 1 }; + const retry = meta.retry || { max: 5, strategy: 'exponential', interval: 1 }; const currentRetry = (Number(headers['x-retry-count']) || 0) + 1; const pubOptions = messageToOptions(qualifier, message); + const rpc = headers['x-reply-to'] !== undefined; - // if custom error thrown, we want to forward it to producer - if (err.status) retry = false; - - config.transport.error({ + config.transport.error(`${rpc ? '<< ' : '< '} ${qualifier}`, { context, headers, - error: err, subscriber: qualifier, destination: '', - rpc: headers['x-reply-to'] !== undefined + error: err }); - if (retry && retry.max > 0 && currentRetry <= retry.max) { + // if custom error thrown, we want to forward it to producer + // and avoid storing it in any dead-letter queue so we return here + if (err.status) { + return carotte.replyToPublisher(message, err, context, true) + .then(chan.ack(message)); + } + + if (retry.max > 0 && currentRetry <= retry.max) { consumerDebug(`Handler error: trying again with strategy ${retry.strategy}`); const rePublishOptions = incrementRetryHeaders(pubOptions, retry); const nextCallDelay = computeNextCall(pubOptions); - setTimeout(() => { + return setTimeout(() => { carotte.publish(qualifier, rePublishOptions, message.content) .then(() => chan.ack(message)) .catch(() => chan.nack(message)); }, nextCallDelay); - } else { - if (retry && currentRetry > retry.max) { - err.status = 500; - } - consumerDebug(`Handler error: ${err.message}`); - delete pubOptions.exchange; + } - // publish the message to the dead-letter queue - carotte.saveDeadLetterIfNeeded(pubOptions, message) - .then(() => { - message.properties.headers = cleanRetryHeaders( - message.properties.headers - ); - return carotte.replyToPublisher(message, err, context, true); - }) - .then(() => chan.ack(message)) - .catch(() => chan.nack(message)); + if (retry && currentRetry > retry.max) { + err.status = 500; } + consumerDebug(`Handler error: ${err.message}`); + + // publish the message to the dead-letter queue + // remove exchange options because we manage this queue ourselves + return carotte.saveDeadLetterIfNeeded(message) + .then(() => { + message.properties.headers = cleanRetryHeaders( + message.properties.headers + ); + return carotte.replyToPublisher(message, err, context, true); + }) + .then(() => chan.ack(message)) + .catch(() => chan.nack(message)); }); }; }; /** * Publish the message to the dead letter queue according to the config - * @param {object} options - options to publish - * @param {object} content - content for dead letter + * @param {object} message - amqplib message * @return {promise} */ - carotte.saveDeadLetterIfNeeded = function saveDeadLetterIfNeeded(options, message) { + carotte.saveDeadLetterIfNeeded = function saveDeadLetterIfNeeded(message) { if (config.enableDeadLetter) { return carotte.publish(config.deadLetterQualifier, - { headers: message.properties.headers }, + { headers: message.properties.headers, isContentBuffer: true }, message.content); } return Promise.resolve(); @@ -602,7 +612,8 @@ function Carotte(config) { return carotte.publish(`direct/${headers['x-reply-to']}`, { headers: newHeaders, - context + context, + log: false }, payload); } return Promise.resolve(); diff --git a/tests/subscriber.spec.js b/tests/subscriber.spec.js index 5eb2e6f..09e0ad1 100644 --- a/tests/subscriber.spec.js +++ b/tests/subscriber.spec.js @@ -76,9 +76,9 @@ describe('subscriber', () => { }); describe('retry', () => { - let callCount = 0; it('should retry when retry is specified', done => { - carotte.subscribe('bye', () => { + let callCount = 0; + carotte.subscribe('bye', { exclusive: true }, () => { callCount++; if (callCount === 4) { setTimeout(done, 500); @@ -90,5 +90,27 @@ describe('subscriber', () => { }, { retry: { max: 3, interval: 0, strategy: 'direct' } }) .then(() => carotte.publish('bye', {})); }); + + it('should not retry when retry is specified but error with status is thrown', () => { + let callCount = 0; + return carotte.subscribe('bye2', { exclusive: true }, () => { + callCount++; + if (callCount === 1) { + throw new Error('Should not be called a second time'); + } else { + const err = new Error(); + err.status = 400; + throw err; + } + }, { retry: { max: 3, interval: 0, strategy: 'direct' } }) + .then(() => carotte.invoke('bye2', {})) + .then(() => { + throw new Error('Should not succeed'); + }) + .catch((err) => { + expect(err.status).to.be.eql(400); + expect(err.message).to.not.be.eql('Should not be called a second time'); + }); + }); }); });