Skip to content

Commit

Permalink
feat(rabbit): support multiple configs on the same handler (#682)
Browse files Browse the repository at this point in the history
Support multiple configurations on the same handler to support use cases where users want to reuse
the same handler method across multiple queues. For example, this could be useful for a situation
where messages are partitioned across multiple queues by a consistent hash exchange.

fix #624
  • Loading branch information
ttshivers authored Jan 21, 2024
1 parent 8962eed commit 93ec23f
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 29 deletions.
30 changes: 30 additions & 0 deletions integration/rabbitmq/e2e/subscribe.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ const exchange = 'testSubscribeExchange';
const routingKey1 = 'testSubscribeRoute1';
const routingKey2 = 'testSubscribeRoute2';
const routingKey3 = 'testSubscribeViaHandlerRoute';
const routingKey4 = 'testSubscribeViaHandlerRouteMulti1';
const routingKey5 = 'testSubscribeViaHandlerRouteMulti2';
const nonJsonRoutingKey = 'nonJsonSubscribeRoute';

const createRoutingKey = 'test.create.object';
Expand All @@ -40,6 +42,13 @@ class SubscribeService {
testHandler(message);
}

@RabbitSubscribe({
name: 'handler2',
})
handleSubscribeByNameMulti(message: object) {
testHandler(message);
}

@RabbitSubscribe({
exchange,
routingKey: [routingKey1, routingKey2],
Expand Down Expand Up @@ -152,6 +161,16 @@ describe('Rabbit Subscribe', () => {
exchange,
routingKey: [routingKey3],
},
handler2: [
{
exchange,
routingKey: routingKey4,
},
{
exchange,
routingKey: routingKey5,
},
],
},
uri,
connectionInitOptions: { wait: true, reject: true, timeout: 3000 },
Expand Down Expand Up @@ -196,6 +215,17 @@ describe('Rabbit Subscribe', () => {
expect(testHandler).toHaveBeenCalledWith(`testMessage`);
});

it('should receive all messages when subscribed via handler name with multiple configs', async () => {
await amqpConnection.publish(exchange, routingKey4, 'testMessage');
await amqpConnection.publish(exchange, routingKey5, 'testMessage2');

await new Promise((resolve) => setTimeout(resolve, 50));

expect(testHandler).toHaveBeenCalledTimes(2);
expect(testHandler).toHaveBeenCalledWith(`testMessage`);
expect(testHandler).toHaveBeenCalledWith(`testMessage2`);
});

it('should work with a topic exchange set up that has multiple subscribers with similar routing keys', async () => {
const routingKeys = [createRoutingKey, updateRoutingKey, deleteRoutingKey];

Expand Down
5 changes: 4 additions & 1 deletion packages/rabbitmq/src/rabbitmq.interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,10 @@ export interface ConnectionInitOptions {
}

export type RabbitMQChannels = Record<string, RabbitMQChannelConfig>;
export type RabbitMQHandlers = Record<string, MessageHandlerOptions>;
export type RabbitMQHandlers = Record<
string,
MessageHandlerOptions | MessageHandlerOptions[]
>;

export interface RabbitMQConfig {
name?: string;
Expand Down
85 changes: 57 additions & 28 deletions packages/rabbitmq/src/rabbitmq.module.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import { DiscoveryModule, DiscoveryService } from '@golevelup/nestjs-discovery';
import {
DiscoveredMethod,
DiscoveredMethodWithMeta,
DiscoveryModule,
DiscoveryService,
} from '@golevelup/nestjs-discovery';
import {
createConfigurableDynamicRootModule,
IConfigurableDynamicRootModule,
Expand Down Expand Up @@ -135,6 +140,40 @@ export class RabbitMQModule
RabbitMQModule.bootstrapped = false;
}

private async setupHandler(
connection: AmqpConnection,
discoveredMethod: DiscoveredMethod,
config: RabbitHandlerConfig,
handler: (...args: any[]) => Promise<any>
) {
const handlerDisplayName = `${discoveredMethod.parentClass.name}.${
discoveredMethod.methodName
} {${config.type}} -> ${
// eslint-disable-next-line sonarjs/no-nested-template-literals
config.queueOptions?.channel ? `${config.queueOptions.channel}::` : ''
}${config.exchange}::${config.routingKey}::${config.queue || 'amqpgen'}`;

if (
config.type === 'rpc' &&
!connection.configuration.enableDirectReplyTo
) {
this.logger.warn(
`Direct Reply-To Functionality is disabled. RPC handler ${handlerDisplayName} will not be registered`
);
return;
}

this.logger.log(handlerDisplayName);

return config.type === 'rpc'
? connection.createRpc(handler, config)
: connection.createSubscriber(
handler,
config,
discoveredMethod.methodName
);
}

// eslint-disable-next-line sonarjs/cognitive-complexity
public async onApplicationBootstrap() {
if (RabbitMQModule.bootstrapped) {
Expand Down Expand Up @@ -199,38 +238,28 @@ export class RabbitMQModule
'rmq' // contextType
);

const mergedConfig = {
...config,
...connection.configuration.handlers[config.name || ''],
};
const { exchange, routingKey, queue, queueOptions } = mergedConfig;
const moduleHandlerConfigRaw =
connection.configuration.handlers[config.name || ''];

const handlerDisplayName = `${discoveredMethod.parentClass.name}.${
discoveredMethod.methodName
} {${config.type}} -> ${
// eslint-disable-next-line sonarjs/no-nested-template-literals
queueOptions?.channel ? `${queueOptions.channel}::` : ''
}${exchange}::${routingKey}::${queue || 'amqpgen'}`;

if (
config.type === 'rpc' &&
!connection.configuration.enableDirectReplyTo
) {
this.logger.warn(
`Direct Reply-To Functionality is disabled. RPC handler ${handlerDisplayName} will not be registered`
);
return;
}
const moduleHandlerConfigs = Array.isArray(moduleHandlerConfigRaw)
? moduleHandlerConfigRaw
: [moduleHandlerConfigRaw];

this.logger.log(handlerDisplayName);
await Promise.all(
moduleHandlerConfigs.map((moduleHandlerConfig) => {
const mergedConfig = {
...config,
...moduleHandlerConfig,
};

return config.type === 'rpc'
? connection.createRpc(handler, mergedConfig)
: connection.createSubscriber(
handler,
return this.setupHandler(
connection,
discoveredMethod,
mergedConfig,
discoveredMethod.methodName
handler
);
})
);
})
);
}
Expand Down

0 comments on commit 93ec23f

Please sign in to comment.