Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix handling of resending messages during a disconnect. #178

Merged
merged 2 commits into from
Aug 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
222 changes: 121 additions & 101 deletions src/ChannelWrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import { EventEmitter } from 'events';
import pb from 'promise-breaker';
import { IAmqpConnectionManager } from './AmqpConnectionManager.js';

const MAX_MESSAGES_PER_BATCH = 1000;

export type SetupFunc =
| ((channel: ConfirmChannel, callback: (error?: Error) => void) => void)
| ((channel: ConfirmChannel) => Promise<void>);
Expand Down Expand Up @@ -106,6 +108,11 @@ export default class ChannelWrapper extends EventEmitter {
*/
private _workerNumber = 0;

/**
* True if the underlying channel has room for more messages.
*/
private _channelHasRoom = true;

public name?: string;

addListener(event: string, listener: (...args: any[]) => void): this;
Expand Down Expand Up @@ -337,7 +344,9 @@ export default class ChannelWrapper extends EventEmitter {
const channel = await connection.createConfirmChannel();

this._channel = channel;
this._channelHasRoom = true;
channel.on('close', () => this._onChannelClose(channel));
channel.on('drain', () => this._onChannelDrain());

this._settingUp = Promise.all(
this._setups.map((setupFn) =>
Expand All @@ -361,14 +370,6 @@ export default class ChannelWrapper extends EventEmitter {
return;
}

if (this._unconfirmedMessages.length > 0) {
// requeue any messages that were left unconfirmed when connection was lost
let message: Message | undefined;
while ((message = this._unconfirmedMessages.shift())) {
this._messages.push(message);
}
}

// Since we just connected, publish any queued messages
this._startWorker();
this.emit('connect');
Expand All @@ -387,6 +388,12 @@ export default class ChannelWrapper extends EventEmitter {
// Wait for another reconnect to create a new channel.
}

/** Called whenever the channel drains. */
private _onChannelDrain(): void {
this._channelHasRoom = true;
this._startWorker();
}

// Called whenever we disconnect from the AMQP server.
private _onDisconnect(ex: { err: Error & { code: number } }): void {
this._irrecoverableCode = ex.err instanceof Error ? ex.err.code : undefined;
Expand Down Expand Up @@ -433,7 +440,9 @@ export default class ChannelWrapper extends EventEmitter {
}

private _shouldPublish(): boolean {
return this._messages.length > 0 && !this._settingUp && !!this._channel;
return (
this._messages.length > 0 && !this._settingUp && !!this._channel && this._channelHasRoom
);
}

// Start publishing queued messages, if there isn't already a worker doing this.
Expand All @@ -450,6 +459,46 @@ export default class ChannelWrapper extends EventEmitter {
return !this._irrecoverableCode || !IRRECOVERABLE_ERRORS.includes(this._irrecoverableCode);
}

private _messageResolved(message: Message, result: boolean) {
removeUnconfirmedMessage(this._unconfirmedMessages, message);
message.resolve(result);
}

private _messageRejected(message: Message, err: Error) {
if (!this._channel && this._canWaitReconnection()) {
// Tried to write to a closed channel. Leave the message in the queue and we'll try again when
// we reconnect.
removeUnconfirmedMessage(this._unconfirmedMessages, message);
this._messages.push(message);
} else {
// Something went wrong trying to send this message - could be JSON.stringify failed, could be
// the broker rejected the message. Either way, reject it back
removeUnconfirmedMessage(this._unconfirmedMessages, message);
message.reject(err);
}
}

private _getEncodedMessage(content: Message['content']): Buffer {
let encodedMessage: Buffer;

if (this._json) {
encodedMessage = Buffer.from(JSON.stringify(content));
} else if (typeof content === 'string') {
encodedMessage = Buffer.from(content);
} else if (content instanceof Buffer) {
encodedMessage = content;
} else if (typeof content === 'object' && typeof (content as any).toString === 'function') {
encodedMessage = Buffer.from((content as any).toString());
} else {
console.warn(
'amqp-connection-manager: Sending JSON message, but json option not speicifed'
);
encodedMessage = Buffer.from(JSON.stringify(content));
}

return encodedMessage;
}

private _publishQueuedMessages(workerNumber: number): void {
const channel = this._channel;
if (
Expand All @@ -463,101 +512,72 @@ export default class ChannelWrapper extends EventEmitter {
return;
}

const message = this._messages.shift();
if (message) {
this._unconfirmedMessages.push(message);

Promise.resolve()
.then(() => {
let encodedMessage: Buffer;
if (this._json) {
encodedMessage = Buffer.from(JSON.stringify(message.content));
} else if (typeof message.content === 'string') {
encodedMessage = Buffer.from(message.content);
} else if (message.content instanceof Buffer) {
encodedMessage = message.content;
} else if (
typeof message.content === 'object' &&
typeof (message.content as any).toString === 'function'
) {
encodedMessage = Buffer.from((message.content as any).toString());
} else {
throw new Error('Invalid message content');
}

let result = true;
const sendPromise = (() => {
switch (message.type) {
case 'publish':
return new Promise<boolean>(function (resolve, reject) {
result = channel.publish(
message.exchange,
message.routingKey,
encodedMessage,
message.options,
(err) => {
if (err) {
reject(err);
} else {
resolve(result);
}
}
);
});
case 'sendToQueue':
return new Promise<boolean>(function (resolve, reject) {
result = channel.sendToQueue(
message.queue,
encodedMessage,
message.options,
(err) => {
if (err) {
reject(err);
} else {
resolve(result);
}
}
);
});
/* istanbul ignore next */
default:
throw new Error(`Unhandled message type ${(message as any).type}`);
}
})();
try {
// Send messages in batches of 1000 - don't want to starve the event loop.
let sendsLeft = MAX_MESSAGES_PER_BATCH;
while (this._channelHasRoom && this._messages.length > 0 && sendsLeft > 0) {
sendsLeft--;

const message = this._messages.shift();
if (!message) {
break;
}

if (result) {
this._publishQueuedMessages(workerNumber);
} else {
channel.once('drain', () => this._publishQueuedMessages(workerNumber));
this._unconfirmedMessages.push(message);

const encodedMessage = this._getEncodedMessage(message.content);

switch (message.type) {
case 'publish': {
let thisCanSend = true;
thisCanSend = this._channelHasRoom = channel.publish(
message.exchange,
message.routingKey,
encodedMessage,
message.options,
(err) => {
if (err) {
this._messageRejected(message, err);
} else {
this._messageResolved(message, thisCanSend);
}
}
);
break;
}
return sendPromise;
})
.then(
(result) => {
removeUnconfirmedMessage(this._unconfirmedMessages, message);
message.resolve(result);
},

(err) => {
if (!this._channel && this._canWaitReconnection()) {
// Tried to write to a closed channel. Leave the message in the queue and we'll try again when
// we reconnect.
removeUnconfirmedMessage(this._unconfirmedMessages, message);
this._messages.push(message);
} else {
// Something went wrong trying to send this message - could be JSON.stringify failed, could be
// the broker rejected the message. Either way, reject it back
removeUnconfirmedMessage(this._unconfirmedMessages, message);
message.reject(err);
}
}
)
.catch(
/* istanbul ignore next */ (err) => {
this.emit('error', err);
this._working = false;
case 'sendToQueue': {
let thisCanSend = true;
thisCanSend = this._channelHasRoom = channel.sendToQueue(
message.queue,
encodedMessage,
message.options,
(err) => {
if (err) {
this._messageRejected(message, err);
} else {
this._messageResolved(message, thisCanSend);
}
}
);
break;
}
);
/* istanbul ignore next */
default:
throw new Error(`Unhandled message type ${(message as any).type}`);
}
}

// If we didn't send all the messages, send some more...
if (this._channelHasRoom && this._messages.length > 0) {
setImmediate(() => this._publishQueuedMessages(workerNumber));
}

this._working = false;

/* istanbul ignore next */
} catch (err) {
this._working = false;
this.emit('error', err);
}
}

Expand Down
Loading