From e1457a598c6ecffca9c864036f1875f546ad5017 Mon Sep 17 00:00:00 2001 From: Jason Walton Date: Thu, 26 Aug 2021 10:11:50 -0400 Subject: [PATCH 1/2] fix: Fix handling of resending messages during a disconnect. The unit tests for this had a fatal flaw - they assumed that if the connection dropped, we'd get nothing back for any in-flight messages. This isn't true, though - we'd actually get back an error from amqplib when the underlying connection fails. This fixes the tests to reflect this. If we rely on amqplib to reject such messages, then moving all messages from _unconfirmedMessages to _messages on a reconnect now becomes superfluous. I also reworked `_publishQueuedMessages()` to be more synchronous. As it stood, I had to add a lot of pointless delays in my tests to make sure that the `then` after publishing a message had time to run. fix #152 --- src/ChannelWrapper.ts | 178 +++++++++++++++++-------------------- test/ChannelWrapperTest.ts | 135 ++++++++++++++++++++++------ 2 files changed, 192 insertions(+), 121 deletions(-) diff --git a/src/ChannelWrapper.ts b/src/ChannelWrapper.ts index 4f077bf..a73f2a8 100644 --- a/src/ChannelWrapper.ts +++ b/src/ChannelWrapper.ts @@ -361,14 +361,6 @@ export default class ChannelWrapper extends EventEmitter { return; } - if (this._unconfirmedMessages.length > 0) { - // requeue any messages that were left unconfirmed when connection was lost - let message: Message | undefined; - while ((message = this._unconfirmedMessages.shift())) { - this._messages.push(message); - } - } - // Since we just connected, publish any queued messages this._startWorker(); this.emit('connect'); @@ -450,6 +442,25 @@ export default class ChannelWrapper extends EventEmitter { return !this._irrecoverableCode || !IRRECOVERABLE_ERRORS.includes(this._irrecoverableCode); } + private _messageResolved(message: Message, result: boolean) { + removeUnconfirmedMessage(this._unconfirmedMessages, message); + message.resolve(result); + } + + private _messageRejected(message: Message, err: Error) { + if (!this._channel && this._canWaitReconnection()) { + // Tried to write to a closed channel. Leave the message in the queue and we'll try again when + // we reconnect. + removeUnconfirmedMessage(this._unconfirmedMessages, message); + this._messages.push(message); + } else { + // Something went wrong trying to send this message - could be JSON.stringify failed, could be + // the broker rejected the message. Either way, reject it back + removeUnconfirmedMessage(this._unconfirmedMessages, message); + message.reject(err); + } + } + private _publishQueuedMessages(workerNumber: number): void { const channel = this._channel; if ( @@ -467,97 +478,72 @@ export default class ChannelWrapper extends EventEmitter { if (message) { this._unconfirmedMessages.push(message); - Promise.resolve() - .then(() => { - let encodedMessage: Buffer; - if (this._json) { - encodedMessage = Buffer.from(JSON.stringify(message.content)); - } else if (typeof message.content === 'string') { - encodedMessage = Buffer.from(message.content); - } else if (message.content instanceof Buffer) { - encodedMessage = message.content; - } else if ( - typeof message.content === 'object' && - typeof (message.content as any).toString === 'function' - ) { - encodedMessage = Buffer.from((message.content as any).toString()); - } else { - throw new Error('Invalid message content'); + try { + let encodedMessage: Buffer | undefined; + if (this._json) { + encodedMessage = Buffer.from(JSON.stringify(message.content)); + } else if (typeof message.content === 'string') { + encodedMessage = Buffer.from(message.content); + } else if (message.content instanceof Buffer) { + encodedMessage = message.content; + } else if ( + typeof message.content === 'object' && + typeof (message.content as any).toString === 'function' + ) { + encodedMessage = Buffer.from((message.content as any).toString()); + } else { + this._messageRejected(message, new Error('Invalid message content')); + } + + let result = true; + if (encodedMessage) { + switch (message.type) { + case 'publish': + result = channel.publish( + message.exchange, + message.routingKey, + encodedMessage, + message.options, + (err) => { + if (err) { + this._messageRejected(message, err); + } else { + this._messageResolved(message, result); + } + } + ); + break; + case 'sendToQueue': + result = channel.sendToQueue( + message.queue, + encodedMessage, + message.options, + (err) => { + if (err) { + this._messageRejected(message, err); + } else { + this._messageResolved(message, result); + } + } + ); + break; + /* istanbul ignore next */ + default: + throw new Error(`Unhandled message type ${(message as any).type}`); } + } - let result = true; - const sendPromise = (() => { - switch (message.type) { - case 'publish': - return new Promise(function (resolve, reject) { - result = channel.publish( - message.exchange, - message.routingKey, - encodedMessage, - message.options, - (err) => { - if (err) { - reject(err); - } else { - resolve(result); - } - } - ); - }); - case 'sendToQueue': - return new Promise(function (resolve, reject) { - result = channel.sendToQueue( - message.queue, - encodedMessage, - message.options, - (err) => { - if (err) { - reject(err); - } else { - resolve(result); - } - } - ); - }); - /* istanbul ignore next */ - default: - throw new Error(`Unhandled message type ${(message as any).type}`); - } - })(); + if (result) { + setImmediate(() => this._publishQueuedMessages(workerNumber)); + } else { + channel.once('drain', () => this._publishQueuedMessages(workerNumber)); + } - if (result) { - this._publishQueuedMessages(workerNumber); - } else { - channel.once('drain', () => this._publishQueuedMessages(workerNumber)); - } - return sendPromise; - }) - .then( - (result) => { - removeUnconfirmedMessage(this._unconfirmedMessages, message); - message.resolve(result); - }, - - (err) => { - if (!this._channel && this._canWaitReconnection()) { - // Tried to write to a closed channel. Leave the message in the queue and we'll try again when - // we reconnect. - removeUnconfirmedMessage(this._unconfirmedMessages, message); - this._messages.push(message); - } else { - // Something went wrong trying to send this message - could be JSON.stringify failed, could be - // the broker rejected the message. Either way, reject it back - removeUnconfirmedMessage(this._unconfirmedMessages, message); - message.reject(err); - } - } - ) - .catch( - /* istanbul ignore next */ (err) => { - this.emit('error', err); - this._working = false; - } - ); + /* istanbul ignore next */ + } catch (err) { + this.emit('error', err); + this._working = false; + } } } diff --git a/test/ChannelWrapperTest.ts b/test/ChannelWrapperTest.ts index 784ad4d..2d7d1ef 100644 --- a/test/ChannelWrapperTest.ts +++ b/test/ChannelWrapperTest.ts @@ -703,11 +703,8 @@ describe('ChannelWrapper', function () { channelWrapper.publish('exchange', 'routingKey', 'content1'); const p2 = channelWrapper.publish('exchange', 'routingKey', 'content2'); - - // Wait for both messages to be sent. - while (callbacks.length < 2) { - await promiseTools.delay(10); - } + await promiseTools.delay(10); + expect(callbacks).to.have.length(2); // Nack the second message. callbacks.find((c) => c.message.toString() === 'content2')?.cb(new Error('boom')); @@ -716,10 +713,10 @@ describe('ChannelWrapper', function () { // Simulate a disconnect and reconnect. connectionManager.simulateDisconnect(); await promiseTools.delay(10); + callbacks.find((c) => c.message.toString() === 'content1')?.cb(new Error('disconnected')); connectionManager.simulateConnect(); - while (callbacks.length < 3) { - await promiseTools.delay(10); - } + await promiseTools.delay(10); + expect(callbacks).to.have.length(3); // Make sure the first message is resent. const resent = callbacks[callbacks.length - 1]; @@ -727,7 +724,7 @@ describe('ChannelWrapper', function () { }); it('should keep sending messages, even if we disconnect in the middle of sending', async function () { - let publishCalls = 0; + const callbacks: ((err: Error | undefined) => void)[] = []; connectionManager.simulateConnect(); const channelWrapper = new ChannelWrapper(connectionManager, { @@ -739,12 +736,86 @@ describe('ChannelWrapper', function () { _options: amqplib.Options.Publish, cb: (err?: Error) => void ) { - publishCalls++; - if (publishCalls === 1) { - // Never reply, this channel is disconnected - } else { - cb(); - } + callbacks.push(cb); + return true; + }; + + return Promise.resolve(); + }, + }); + + await channelWrapper.waitForConnect(); + const p1 = channelWrapper.publish('exchange', 'routingKey', 'content'); + + // Disconnect, and generate an error for the in-flight message. + await promiseTools.delay(10); + connectionManager.simulateDisconnect(); + expect(callbacks).to.have.length(1); + callbacks[0](new Error('disconnected')); + + // Reconnect. Should resend the message. + await promiseTools.delay(10); + connectionManager.simulateConnect(); + await promiseTools.delay(10); + expect(callbacks).to.have.length(2); + callbacks[1](undefined); + await p1; + }); + + it('should handle getting a confirm out-of-order with a disconnect', async function () { + const callbacks: ((err: Error | undefined) => void)[] = []; + + connectionManager.simulateConnect(); + const channelWrapper = new ChannelWrapper(connectionManager, { + setup(channel: amqplib.ConfirmChannel) { + channel.publish = function ( + _exchange: string, + _routingKey: string, + _message: Buffer, + _options: amqplib.Options.Publish, + cb: (err?: Error) => void + ) { + callbacks.push(cb); + return true; + }; + + return Promise.resolve(); + }, + }); + + await channelWrapper.waitForConnect(); + const p1 = channelWrapper.publish('exchange', 'routingKey', 'content'); + + // Disconnect. + await promiseTools.delay(10); + connectionManager.simulateDisconnect(); + expect(callbacks).to.have.length(1); + + // Message succeeds after disconnect. + callbacks[0](undefined); + + // Reconnect. Should resend the message. + await promiseTools.delay(10); + connectionManager.simulateConnect(); + await promiseTools.delay(10); + expect(callbacks).to.have.length(1); + await p1; + }); + + it('should handle getting a confirm out-of-order with a disconnect and reconnect', async function () { + const callbacks: ((err: Error | undefined) => void)[] = []; + + connectionManager.simulateConnect(); + const channelWrapper = new ChannelWrapper(connectionManager, { + setup(channel: amqplib.ConfirmChannel) { + channel.publish = function ( + _exchange: string, + _routingKey: string, + _message: Buffer, + _options: amqplib.Options.Publish, + cb: (err?: Error) => void + ) { + callbacks.push(cb); return true; }; @@ -754,12 +825,21 @@ describe('ChannelWrapper', function () { await channelWrapper.waitForConnect(); const p1 = channelWrapper.publish('exchange', 'routingKey', 'content'); + + // Disconnect. await promiseTools.delay(10); connectionManager.simulateDisconnect(); + expect(callbacks).to.have.length(1); + + // Reconnect. await promiseTools.delay(10); connectionManager.simulateConnect(); + await promiseTools.delay(10); + + // Message from first connection succeeds after disconnect/reconnect. + expect(callbacks).to.have.length(1); + callbacks[0](undefined); await p1; - expect(publishCalls).to.equal(2); }); it('should emit an error, we disconnect during publish with code 502 (AMQP Frame Syntax Error)', function () { @@ -793,7 +873,7 @@ describe('ChannelWrapper', function () { }); it('should retry, we disconnect during publish with code 320 (AMQP Connection Forced Error)', async function () { - let publishCalls = 0; + const callbacks: ((err: Error | undefined) => void)[] = []; connectionManager.simulateConnect(); const err = new Error('AMQP Frame Syntax Error'); @@ -807,13 +887,7 @@ describe('ChannelWrapper', function () { _options: amqplib.Options.Publish, cb: (err?: Error) => void ) { - publishCalls++; - if (publishCalls === 1) { - // Never reply, this channel is disconnected - connectionManager.simulateRemoteCloseEx(err); - } else { - cb(); - } + callbacks.push(cb); return true; }; return Promise.resolve(); @@ -823,9 +897,20 @@ describe('ChannelWrapper', function () { await channelWrapper.waitForConnect(); const p1 = channelWrapper.publish('exchange', 'routingKey', 'content'); await promiseTools.delay(10); + expect(callbacks).to.have.length(1); + + // Simulate disconnect during publish. + connectionManager.simulateRemoteCloseEx(err); + callbacks[0](new Error('disconnected')); + + // Reconnect. connectionManager.simulateConnect(); + await promiseTools.delay(10); + + // Message should be republished. + expect(callbacks).to.have.length(2); + callbacks[1](undefined); await p1; - expect(publishCalls).to.equal(2); }); it('should publish queued messages to the underlying channel without waiting for confirms', async function () { From b866ef25ebe97c1cf4fe421835291584cb738f41 Mon Sep 17 00:00:00 2001 From: Jason Walton Date: Thu, 26 Aug 2021 11:00:39 -0400 Subject: [PATCH 2/2] perf: Send messages to underlying channel in synchronous batches. --- src/ChannelWrapper.ts | 160 +++++++++++++++++++++++++----------------- 1 file changed, 97 insertions(+), 63 deletions(-) diff --git a/src/ChannelWrapper.ts b/src/ChannelWrapper.ts index a73f2a8..6a050f4 100644 --- a/src/ChannelWrapper.ts +++ b/src/ChannelWrapper.ts @@ -4,6 +4,8 @@ import { EventEmitter } from 'events'; import pb from 'promise-breaker'; import { IAmqpConnectionManager } from './AmqpConnectionManager.js'; +const MAX_MESSAGES_PER_BATCH = 1000; + export type SetupFunc = | ((channel: ConfirmChannel, callback: (error?: Error) => void) => void) | ((channel: ConfirmChannel) => Promise); @@ -106,6 +108,11 @@ export default class ChannelWrapper extends EventEmitter { */ private _workerNumber = 0; + /** + * True if the underlying channel has room for more messages. + */ + private _channelHasRoom = true; + public name?: string; addListener(event: string, listener: (...args: any[]) => void): this; @@ -337,7 +344,9 @@ export default class ChannelWrapper extends EventEmitter { const channel = await connection.createConfirmChannel(); this._channel = channel; + this._channelHasRoom = true; channel.on('close', () => this._onChannelClose(channel)); + channel.on('drain', () => this._onChannelDrain()); this._settingUp = Promise.all( this._setups.map((setupFn) => @@ -379,6 +388,12 @@ export default class ChannelWrapper extends EventEmitter { // Wait for another reconnect to create a new channel. } + /** Called whenever the channel drains. */ + private _onChannelDrain(): void { + this._channelHasRoom = true; + this._startWorker(); + } + // Called whenever we disconnect from the AMQP server. private _onDisconnect(ex: { err: Error & { code: number } }): void { this._irrecoverableCode = ex.err instanceof Error ? ex.err.code : undefined; @@ -425,7 +440,9 @@ export default class ChannelWrapper extends EventEmitter { } private _shouldPublish(): boolean { - return this._messages.length > 0 && !this._settingUp && !!this._channel; + return ( + this._messages.length > 0 && !this._settingUp && !!this._channel && this._channelHasRoom + ); } // Start publishing queued messages, if there isn't already a worker doing this. @@ -461,6 +478,27 @@ export default class ChannelWrapper extends EventEmitter { } } + private _getEncodedMessage(content: Message['content']): Buffer { + let encodedMessage: Buffer; + + if (this._json) { + encodedMessage = Buffer.from(JSON.stringify(content)); + } else if (typeof content === 'string') { + encodedMessage = Buffer.from(content); + } else if (content instanceof Buffer) { + encodedMessage = content; + } else if (typeof content === 'object' && typeof (content as any).toString === 'function') { + encodedMessage = Buffer.from((content as any).toString()); + } else { + console.warn( + 'amqp-connection-manager: Sending JSON message, but json option not speicifed' + ); + encodedMessage = Buffer.from(JSON.stringify(content)); + } + + return encodedMessage; + } + private _publishQueuedMessages(workerNumber: number): void { const channel = this._channel; if ( @@ -474,76 +512,72 @@ export default class ChannelWrapper extends EventEmitter { return; } - const message = this._messages.shift(); - if (message) { - this._unconfirmedMessages.push(message); - - try { - let encodedMessage: Buffer | undefined; - if (this._json) { - encodedMessage = Buffer.from(JSON.stringify(message.content)); - } else if (typeof message.content === 'string') { - encodedMessage = Buffer.from(message.content); - } else if (message.content instanceof Buffer) { - encodedMessage = message.content; - } else if ( - typeof message.content === 'object' && - typeof (message.content as any).toString === 'function' - ) { - encodedMessage = Buffer.from((message.content as any).toString()); - } else { - this._messageRejected(message, new Error('Invalid message content')); + try { + // Send messages in batches of 1000 - don't want to starve the event loop. + let sendsLeft = MAX_MESSAGES_PER_BATCH; + while (this._channelHasRoom && this._messages.length > 0 && sendsLeft > 0) { + sendsLeft--; + + const message = this._messages.shift(); + if (!message) { + break; } - let result = true; - if (encodedMessage) { - switch (message.type) { - case 'publish': - result = channel.publish( - message.exchange, - message.routingKey, - encodedMessage, - message.options, - (err) => { - if (err) { - this._messageRejected(message, err); - } else { - this._messageResolved(message, result); - } + this._unconfirmedMessages.push(message); + + const encodedMessage = this._getEncodedMessage(message.content); + + switch (message.type) { + case 'publish': { + let thisCanSend = true; + thisCanSend = this._channelHasRoom = channel.publish( + message.exchange, + message.routingKey, + encodedMessage, + message.options, + (err) => { + if (err) { + this._messageRejected(message, err); + } else { + this._messageResolved(message, thisCanSend); } - ); - break; - case 'sendToQueue': - result = channel.sendToQueue( - message.queue, - encodedMessage, - message.options, - (err) => { - if (err) { - this._messageRejected(message, err); - } else { - this._messageResolved(message, result); - } + } + ); + break; + } + case 'sendToQueue': { + let thisCanSend = true; + thisCanSend = this._channelHasRoom = channel.sendToQueue( + message.queue, + encodedMessage, + message.options, + (err) => { + if (err) { + this._messageRejected(message, err); + } else { + this._messageResolved(message, thisCanSend); } - ); - break; - /* istanbul ignore next */ - default: - throw new Error(`Unhandled message type ${(message as any).type}`); + } + ); + break; } + /* istanbul ignore next */ + default: + throw new Error(`Unhandled message type ${(message as any).type}`); } + } - if (result) { - setImmediate(() => this._publishQueuedMessages(workerNumber)); - } else { - channel.once('drain', () => this._publishQueuedMessages(workerNumber)); - } - - /* istanbul ignore next */ - } catch (err) { - this.emit('error', err); - this._working = false; + // If we didn't send all the messages, send some more... + if (this._channelHasRoom && this._messages.length > 0) { + setImmediate(() => this._publishQueuedMessages(workerNumber)); } + + this._working = false; + + /* istanbul ignore next */ + } catch (err) { + this._working = false; + this.emit('error', err); } }