Skip to content

Commit

Permalink
perf: Send messages to underlying channel in synchronous batches.
Browse files Browse the repository at this point in the history
  • Loading branch information
jwalton committed Aug 26, 2021
1 parent e1457a5 commit b866ef2
Showing 1 changed file with 97 additions and 63 deletions.
160 changes: 97 additions & 63 deletions src/ChannelWrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import { EventEmitter } from 'events';
import pb from 'promise-breaker';
import { IAmqpConnectionManager } from './AmqpConnectionManager.js';

const MAX_MESSAGES_PER_BATCH = 1000;

export type SetupFunc =
| ((channel: ConfirmChannel, callback: (error?: Error) => void) => void)
| ((channel: ConfirmChannel) => Promise<void>);
Expand Down Expand Up @@ -106,6 +108,11 @@ export default class ChannelWrapper extends EventEmitter {
*/
private _workerNumber = 0;

/**
* True if the underlying channel has room for more messages.
*/
private _channelHasRoom = true;

public name?: string;

addListener(event: string, listener: (...args: any[]) => void): this;
Expand Down Expand Up @@ -337,7 +344,9 @@ export default class ChannelWrapper extends EventEmitter {
const channel = await connection.createConfirmChannel();

this._channel = channel;
this._channelHasRoom = true;
channel.on('close', () => this._onChannelClose(channel));
channel.on('drain', () => this._onChannelDrain());

this._settingUp = Promise.all(
this._setups.map((setupFn) =>
Expand Down Expand Up @@ -379,6 +388,12 @@ export default class ChannelWrapper extends EventEmitter {
// Wait for another reconnect to create a new channel.
}

/** Called whenever the channel drains. */
private _onChannelDrain(): void {
this._channelHasRoom = true;
this._startWorker();
}

// Called whenever we disconnect from the AMQP server.
private _onDisconnect(ex: { err: Error & { code: number } }): void {
this._irrecoverableCode = ex.err instanceof Error ? ex.err.code : undefined;
Expand Down Expand Up @@ -425,7 +440,9 @@ export default class ChannelWrapper extends EventEmitter {
}

private _shouldPublish(): boolean {
return this._messages.length > 0 && !this._settingUp && !!this._channel;
return (
this._messages.length > 0 && !this._settingUp && !!this._channel && this._channelHasRoom
);
}

// Start publishing queued messages, if there isn't already a worker doing this.
Expand Down Expand Up @@ -461,6 +478,27 @@ export default class ChannelWrapper extends EventEmitter {
}
}

private _getEncodedMessage(content: Message['content']): Buffer {
let encodedMessage: Buffer;

if (this._json) {
encodedMessage = Buffer.from(JSON.stringify(content));
} else if (typeof content === 'string') {
encodedMessage = Buffer.from(content);
} else if (content instanceof Buffer) {
encodedMessage = content;
} else if (typeof content === 'object' && typeof (content as any).toString === 'function') {
encodedMessage = Buffer.from((content as any).toString());
} else {
console.warn(
'amqp-connection-manager: Sending JSON message, but json option not speicifed'
);
encodedMessage = Buffer.from(JSON.stringify(content));
}

return encodedMessage;
}

private _publishQueuedMessages(workerNumber: number): void {
const channel = this._channel;
if (
Expand All @@ -474,76 +512,72 @@ export default class ChannelWrapper extends EventEmitter {
return;
}

const message = this._messages.shift();
if (message) {
this._unconfirmedMessages.push(message);

try {
let encodedMessage: Buffer | undefined;
if (this._json) {
encodedMessage = Buffer.from(JSON.stringify(message.content));
} else if (typeof message.content === 'string') {
encodedMessage = Buffer.from(message.content);
} else if (message.content instanceof Buffer) {
encodedMessage = message.content;
} else if (
typeof message.content === 'object' &&
typeof (message.content as any).toString === 'function'
) {
encodedMessage = Buffer.from((message.content as any).toString());
} else {
this._messageRejected(message, new Error('Invalid message content'));
try {
// Send messages in batches of 1000 - don't want to starve the event loop.
let sendsLeft = MAX_MESSAGES_PER_BATCH;
while (this._channelHasRoom && this._messages.length > 0 && sendsLeft > 0) {
sendsLeft--;

const message = this._messages.shift();
if (!message) {
break;
}

let result = true;
if (encodedMessage) {
switch (message.type) {
case 'publish':
result = channel.publish(
message.exchange,
message.routingKey,
encodedMessage,
message.options,
(err) => {
if (err) {
this._messageRejected(message, err);
} else {
this._messageResolved(message, result);
}
this._unconfirmedMessages.push(message);

const encodedMessage = this._getEncodedMessage(message.content);

switch (message.type) {
case 'publish': {
let thisCanSend = true;
thisCanSend = this._channelHasRoom = channel.publish(
message.exchange,
message.routingKey,
encodedMessage,
message.options,
(err) => {
if (err) {
this._messageRejected(message, err);
} else {
this._messageResolved(message, thisCanSend);
}
);
break;
case 'sendToQueue':
result = channel.sendToQueue(
message.queue,
encodedMessage,
message.options,
(err) => {
if (err) {
this._messageRejected(message, err);
} else {
this._messageResolved(message, result);
}
}
);
break;
}
case 'sendToQueue': {
let thisCanSend = true;
thisCanSend = this._channelHasRoom = channel.sendToQueue(
message.queue,
encodedMessage,
message.options,
(err) => {
if (err) {
this._messageRejected(message, err);
} else {
this._messageResolved(message, thisCanSend);
}
);
break;
/* istanbul ignore next */
default:
throw new Error(`Unhandled message type ${(message as any).type}`);
}
);
break;
}
/* istanbul ignore next */
default:
throw new Error(`Unhandled message type ${(message as any).type}`);
}
}

if (result) {
setImmediate(() => this._publishQueuedMessages(workerNumber));
} else {
channel.once('drain', () => this._publishQueuedMessages(workerNumber));
}

/* istanbul ignore next */
} catch (err) {
this.emit('error', err);
this._working = false;
// If we didn't send all the messages, send some more...
if (this._channelHasRoom && this._messages.length > 0) {
setImmediate(() => this._publishQueuedMessages(workerNumber));
}

this._working = false;

/* istanbul ignore next */
} catch (err) {
this._working = false;
this.emit('error', err);
}
}

Expand Down

0 comments on commit b866ef2

Please sign in to comment.