diff --git a/src/ChannelWrapper.js b/src/ChannelWrapper.js index 3a88648..de20f48 100644 --- a/src/ChannelWrapper.js +++ b/src/ChannelWrapper.js @@ -163,6 +163,9 @@ export default class ChannelWrapper extends EventEmitter { // Place to store queued messages. this._messages = []; + // Place to store published, but not yet confirmed messages + this._unconfirmedMessages = []; + // True if the "worker" is busy sending messages. False if we need to // start the worker to get stuff done. this._working = false; @@ -231,6 +234,12 @@ export default class ChannelWrapper extends EventEmitter { // Can happen if channel closes while we're setting up. return; } + if (this._unconfirmedMessages.length > 0) { + // requeu any messages that were left unconfirmed when connection was lost + while (this._unconfirmedMessages.length) { + this._messages.push(this._unconfirmedMessages.shift()); + } + } // Since we just connected, publish any queued messages this._startWorker(); @@ -278,6 +287,10 @@ export default class ChannelWrapper extends EventEmitter { // Reject any unsent messages. this._messages.forEach(message => message.reject(new Error('Channel closed'))); } + if(this._unconfirmedMessages.length !== 0) { + // Reject any unconfirmed messages. + this._unconfirmedMessages.forEach(message => message.reject(new Error('Channel closed'))); + } this._connectionManager.removeListener('connect', this._onConnect); this._connectionManager.removeListener('disconnect', this._onDisconnect); @@ -311,7 +324,8 @@ export default class ChannelWrapper extends EventEmitter { } const channel = this._channel; - const message = this._messages[0]; + const message = this._messages.shift(); + this._unconfirmedMessages.push(message); Promise.resolve() .then(() => { @@ -348,29 +362,27 @@ export default class ChannelWrapper extends EventEmitter { } })(); + // Send some more! + this._publishQueuedMessages(workerNumber); + return sendPromise; }) .then( result => { - this._messages.shift(); + this._unconfirmedMessages.shift(); message.resolve(result); - - // Send some more! - this._publishQueuedMessages(workerNumber); }, err => { if(!this._channel) { // Tried to write to a closed channel. Leave the message in the queue and we'll try again when we // reconnect. + this._messages.unshift(this._unconfirmedMessages.shift()); } else { // Something went wrong trying to send this message - could be JSON.stringify failed, could be the // broker rejected the message. Either way, reject it back - this._messages.shift(); + this._unconfirmedMessages.shift(); message.reject(err); - - // Send some more! - this._publishQueuedMessages(workerNumber); } } ) diff --git a/test/ChannelWrapperTest.js b/test/ChannelWrapperTest.js index 43a618f..75c76aa 100644 --- a/test/ChannelWrapperTest.js +++ b/test/ChannelWrapperTest.js @@ -589,4 +589,27 @@ describe('ChannelWrapper', function() { return p1; }).then(() => expect(publishCalls).to.equal(2)); }); -}); \ No newline at end of file + + it('should publish enqued messages to the underlying channel without waiting for confirms', function() { + connectionManager.simulateConnect(); + let p1, p2; + const channelWrapper = new ChannelWrapper(connectionManager, { + setup(channel) { + channel.publish = sinon.stub(); + return Promise.resolve(); + } + }); + + return channelWrapper.waitForConnect() + .then(() => { + p1 = channelWrapper.publish('exchange', 'routingKey', 'msg:1'); + p2 = channelWrapper.publish('exchange', 'routingKey', 'msg:2'); + return promiseTools.delay(10); + }).then(() => { + const channel = channelWrapper._channel; + expect(channel.publish.calledTwice).to.be.true; + expect(p1).to.not.be.fulfilled; + expect(p2).to.not.be.fulfilled; + }); + }); +});