Skip to content

Commit

Permalink
feat: cancel specific consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
luddd3 committed Jan 26, 2022
1 parent a44f8b3 commit 5f3b2eb
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 2 deletions.
26 changes: 24 additions & 2 deletions src/ChannelWrapper.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import type * as amqplib from 'amqplib';
import { Options } from 'amqplib';
import crypto from 'crypto';
import { EventEmitter } from 'events';
import pb from 'promise-breaker';
import { promisify } from 'util';
import { IAmqpConnectionManager } from './AmqpConnectionManager.js';

const MAX_MESSAGES_PER_BATCH = 1000;

const randomBytes = promisify(crypto.randomBytes);

export type Channel = amqplib.ConfirmChannel | amqplib.Channel;

export type SetupFunc =
Expand Down Expand Up @@ -718,15 +722,20 @@ export default class ChannelWrapper extends EventEmitter {
queue: string,
onMessage: Consumer['onMessage'],
options: ConsumerOptions = {}
): Promise<void> {
): Promise<amqplib.Replies.Consume> {
const consumerTag = options.consumerTag || (await randomBytes(16)).toString('hex');
const consumer: Consumer = {
consumerTag: null,
queue,
onMessage,
options,
options: {
...options,
consumerTag,
},
};
this._consumers.push(consumer);
await this._consume(consumer);
return { consumerTag };
}

private async _consume(consumer: Consumer): Promise<void> {
Expand Down Expand Up @@ -793,6 +802,19 @@ export default class ChannelWrapper extends EventEmitter {
);
}

async cancel(consumerTag: string): Promise<void> {
const idx = this._consumers.findIndex((x) => x.options.consumerTag === consumerTag);
if (idx === -1) {
return;
}

const consumer = this._consumers[idx];
this._consumers.splice(idx, 1);
if (this._channel && consumer.consumerTag) {
await this._channel.cancel(consumer.consumerTag);
}
}

/** Send an `ack` to the underlying channel. */
ack(message: amqplib.Message, allUpTo?: boolean): void {
this._channel && this._channel.ack(message, allUpTo);
Expand Down
65 changes: 65 additions & 0 deletions test/ChannelWrapperTest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1217,6 +1217,71 @@ describe('ChannelWrapper', function () {
expect(consumerTag).to.equal(2);
expect(canceledTags).to.deep.equal(['0', '1']);
});

it('should be able to cancel specific consumers', async function () {
let onQueue1: any = null;
let onQueue2: any = null;
const canceledTags: number[] = [];

connectionManager.simulateConnect();
const channelWrapper = new ChannelWrapper(connectionManager, {
async setup(channel: amqplib.ConfirmChannel) {
channel.consume = jest.fn().mockImplementation((queue, onMsg, _options) => {
if (queue === 'queue1') {
onQueue1 = onMsg;
} else {
onQueue2 = onMsg;
}
return Promise.resolve({
consumerTag: _options.consumerTag,
});
});
channel.cancel = jest.fn().mockImplementation((consumerTag) => {
canceledTags.push(consumerTag);
if (consumerTag === '0') {
onQueue1(null);
} else if (consumerTag === '1') {
onQueue2(null);
}
return Promise.resolve();
});
},
});
await channelWrapper.waitForConnect();

const queue1: any[] = [];
const { consumerTag: consumerTag1 } = await channelWrapper.consume(
'queue1',
(msg) => {
queue1.push(msg);
},
{ consumerTag: '1' }
);

const queue2: any[] = [];
const { consumerTag: consumerTag2 } = await channelWrapper.consume(
'queue2',
(msg) => {
queue2.push(msg);
},
{ consumerTag: '2' }
);

onQueue1(1);
onQueue2(1);

await channelWrapper.cancel(consumerTag1);
await channelWrapper.cancel(consumerTag2);

// Consumers shouldn't be resumed after reconnect when canceled
connectionManager.simulateDisconnect();
connectionManager.simulateConnect();
await channelWrapper.waitForConnect();

expect(queue1).to.deep.equal([1]);
expect(queue2).to.deep.equal([1]);
expect(canceledTags).to.deep.equal(['1', '2']);
});
});

/** Returns the arguments of the most recent call to this mock. */
Expand Down

0 comments on commit 5f3b2eb

Please sign in to comment.