Skip to content

Commit

Permalink
Merge pull request #84 from ahem/do-not-block-while-waiting-for-confirm
Browse files Browse the repository at this point in the history
Do not block while waiting for previous message to be confirmed
  • Loading branch information
jwalton authored May 1, 2019
2 parents fb64d55 + 0a96f85 commit ceacfb5
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 10 deletions.
30 changes: 21 additions & 9 deletions src/ChannelWrapper.js
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ export default class ChannelWrapper extends EventEmitter {
// Place to store queued messages.
this._messages = [];

// Place to store published, but not yet confirmed messages
this._unconfirmedMessages = [];

// True if the "worker" is busy sending messages. False if we need to
// start the worker to get stuff done.
this._working = false;
Expand Down Expand Up @@ -231,6 +234,12 @@ export default class ChannelWrapper extends EventEmitter {
// Can happen if channel closes while we're setting up.
return;
}
if (this._unconfirmedMessages.length > 0) {
// requeu any messages that were left unconfirmed when connection was lost
while (this._unconfirmedMessages.length) {
this._messages.push(this._unconfirmedMessages.shift());
}
}

// Since we just connected, publish any queued messages
this._startWorker();
Expand Down Expand Up @@ -278,6 +287,10 @@ export default class ChannelWrapper extends EventEmitter {
// Reject any unsent messages.
this._messages.forEach(message => message.reject(new Error('Channel closed')));
}
if(this._unconfirmedMessages.length !== 0) {
// Reject any unconfirmed messages.
this._unconfirmedMessages.forEach(message => message.reject(new Error('Channel closed')));
}

this._connectionManager.removeListener('connect', this._onConnect);
this._connectionManager.removeListener('disconnect', this._onDisconnect);
Expand Down Expand Up @@ -311,7 +324,8 @@ export default class ChannelWrapper extends EventEmitter {
}

const channel = this._channel;
const message = this._messages[0];
const message = this._messages.shift();
this._unconfirmedMessages.push(message);

Promise.resolve()
.then(() => {
Expand Down Expand Up @@ -348,29 +362,27 @@ export default class ChannelWrapper extends EventEmitter {
}
})();

// Send some more!
this._publishQueuedMessages(workerNumber);

return sendPromise;
})
.then(
result => {
this._messages.shift();
this._unconfirmedMessages.shift();
message.resolve(result);

// Send some more!
this._publishQueuedMessages(workerNumber);
},

err => {
if(!this._channel) {
// Tried to write to a closed channel. Leave the message in the queue and we'll try again when we
// reconnect.
this._messages.unshift(this._unconfirmedMessages.shift());
} else {
// Something went wrong trying to send this message - could be JSON.stringify failed, could be the
// broker rejected the message. Either way, reject it back
this._messages.shift();
this._unconfirmedMessages.shift();
message.reject(err);

// Send some more!
this._publishQueuedMessages(workerNumber);
}
}
)
Expand Down
25 changes: 24 additions & 1 deletion test/ChannelWrapperTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -589,4 +589,27 @@ describe('ChannelWrapper', function() {
return p1;
}).then(() => expect(publishCalls).to.equal(2));
});
});

it('should publish enqued messages to the underlying channel without waiting for confirms', function() {
connectionManager.simulateConnect();
let p1, p2;
const channelWrapper = new ChannelWrapper(connectionManager, {
setup(channel) {
channel.publish = sinon.stub();
return Promise.resolve();
}
});

return channelWrapper.waitForConnect()
.then(() => {
p1 = channelWrapper.publish('exchange', 'routingKey', 'msg:1');
p2 = channelWrapper.publish('exchange', 'routingKey', 'msg:2');
return promiseTools.delay(10);
}).then(() => {
const channel = channelWrapper._channel;
expect(channel.publish.calledTwice).to.be.true;
expect(p1).to.not.be.fulfilled;
expect(p2).to.not.be.fulfilled;
});
});
});

0 comments on commit ceacfb5

Please sign in to comment.