Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rabbitmq): add exchange-to-exchange bindings config #7

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions integration/rabbitmq/e2e/configuration.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -582,5 +582,85 @@ describe('Module Configuration', () => {
await app.close();
});
});

it('should create exchange bindings', async () => {
const originalConnect = amqplib.connect;
let bindExchangeSpy;

const connectSpy = jest
.spyOn(amqplib, 'connect')
.mockImplementation((...args) => {
const result = originalConnect(...args);
result.then((conn) => {
const originalCreateConfirmChannel = conn.createConfirmChannel;
jest
.spyOn(conn, 'createConfirmChannel')
.mockImplementation(function (...args) {
const result = originalCreateConfirmChannel.apply(this, args);
result.then((channel) => {
bindExchangeSpy = jest.spyOn(channel, 'bindExchange');
});
return result;
});
});
return result;
});

const otherExchangeName = 'otherExchange';

app = await Test.createTestingModule({
imports: [
RabbitMQModule.forRootAsync(RabbitMQModule, {
useFactory: async () => {
return {
exchanges: [
{
name: nonExistingExchange,
type: 'topic',
createExchangeIfNotExists: true,
},
{
name: otherExchangeName,
type: 'topic',
createExchangeIfNotExists: true,
},
],
exchangeBindings: [
{
destination: otherExchangeName,
source: nonExistingExchange,
pattern: '*',
},
],
uri,
connectionInitOptions: {
wait: true,
reject: true,
timeout: 3000,
},
};
},
}),
],
}).compile();

const amqpConnection = app.get<AmqpConnection>(AmqpConnection);
expect(app).toBeDefined();

expect(connectSpy).toHaveBeenCalledTimes(1);
expect(connectSpy).toHaveBeenCalledWith(amqplibUri, undefined);
expect(bindExchangeSpy).toHaveBeenCalledWith(
otherExchangeName,
nonExistingExchange,
'*',
undefined,
);

await app.init();
expect(
await amqpConnection.channel.checkExchange(nonExistingExchange),
).toBeDefined();
await app.close();
});
});
});
30 changes: 30 additions & 0 deletions integration/rabbitmq/e2e/subscribe.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ const exchange = 'testSubscribeExchange';
const routingKey1 = 'testSubscribeRoute1';
const routingKey2 = 'testSubscribeRoute2';
const routingKey3 = 'testSubscribeViaHandlerRoute';
const routingKey4 = 'testSubscribeViaHandlerRouteMulti1';
const routingKey5 = 'testSubscribeViaHandlerRouteMulti2';
const nonJsonRoutingKey = 'nonJsonSubscribeRoute';

const createRoutingKey = 'test.create.object';
Expand All @@ -40,6 +42,13 @@ class SubscribeService {
testHandler(message);
}

@RabbitSubscribe({
name: 'handler2',
})
handleSubscribeByNameMulti(message: object) {
testHandler(message);
}

@RabbitSubscribe({
exchange,
routingKey: [routingKey1, routingKey2],
Expand Down Expand Up @@ -152,6 +161,16 @@ describe('Rabbit Subscribe', () => {
exchange,
routingKey: [routingKey3],
},
handler2: [
{
exchange,
routingKey: routingKey4,
},
{
exchange,
routingKey: routingKey5,
},
],
},
uri,
connectionInitOptions: { wait: true, reject: true, timeout: 3000 },
Expand Down Expand Up @@ -196,6 +215,17 @@ describe('Rabbit Subscribe', () => {
expect(testHandler).toHaveBeenCalledWith(`testMessage`);
});

it('should receive all messages when subscribed via handler name with multiple configs', async () => {
await amqpConnection.publish(exchange, routingKey4, 'testMessage');
await amqpConnection.publish(exchange, routingKey5, 'testMessage2');

await new Promise((resolve) => setTimeout(resolve, 50));

expect(testHandler).toHaveBeenCalledTimes(2);
expect(testHandler).toHaveBeenCalledWith(`testMessage`);
expect(testHandler).toHaveBeenCalledWith(`testMessage2`);
});

it('should work with a topic exchange set up that has multiple subscribers with similar routing keys', async () => {
const routingKeys = [createRoutingKey, updateRoutingKey, deleteRoutingKey];

Expand Down
12 changes: 12 additions & 0 deletions packages/rabbitmq/src/amqp/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ const defaultConfig = {
),
defaultSubscribeErrorBehavior: MessageHandlerErrorBehavior.REQUEUE,
exchanges: [],
exchangeBindings: [],
queues: [],
defaultRpcTimeout: 10000,
connectionInitOptions: {
Expand Down Expand Up @@ -291,6 +292,17 @@ export class AmqpConnection {
})
);

await Promise.all(
this.config.exchangeBindings.map((exchangeBinding) =>
channel.bindExchange(
exchangeBinding.destination,
exchangeBinding.source,
exchangeBinding.pattern,
exchangeBinding.args
)
)
);

await this.setupQueuesWithBindings(channel, this.config.queues);

if (this.config.enableDirectReplyTo) {
Expand Down
13 changes: 12 additions & 1 deletion packages/rabbitmq/src/rabbitmq.interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ export interface RabbitMQQueueConfig {
bindQueueArguments?: any;
}

export interface RabbitMQExchangeBindingConfig {
destination: string;
source: string;
pattern: string;
args?: any;
}

export type ConsumeOptions = Options.Consume;

export interface MessageOptions {
Expand Down Expand Up @@ -97,7 +104,10 @@ export interface ConnectionInitOptions {
}

export type RabbitMQChannels = Record<string, RabbitMQChannelConfig>;
export type RabbitMQHandlers = Record<string, MessageHandlerOptions>;
export type RabbitMQHandlers = Record<
string,
MessageHandlerOptions | MessageHandlerOptions[]
>;

export interface RabbitMQConfig {
name?: string;
Expand All @@ -107,6 +117,7 @@ export interface RabbitMQConfig {
*/
prefetchCount?: number;
exchanges?: RabbitMQExchangeConfig[];
exchangeBindings?: RabbitMQExchangeBindingConfig[];
queues?: RabbitMQQueueConfig[];
defaultRpcTimeout?: number;
defaultExchangeType?: string;
Expand Down
85 changes: 57 additions & 28 deletions packages/rabbitmq/src/rabbitmq.module.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import { DiscoveryModule, DiscoveryService } from '@golevelup/nestjs-discovery';
import {
DiscoveredMethod,
DiscoveredMethodWithMeta,
DiscoveryModule,
DiscoveryService,
} from '@golevelup/nestjs-discovery';
import {
createConfigurableDynamicRootModule,
IConfigurableDynamicRootModule,
Expand Down Expand Up @@ -135,6 +140,40 @@ export class RabbitMQModule
RabbitMQModule.bootstrapped = false;
}

private async setupHandler(
connection: AmqpConnection,
discoveredMethod: DiscoveredMethod,
config: RabbitHandlerConfig,
handler: (...args: any[]) => Promise<any>
) {
const handlerDisplayName = `${discoveredMethod.parentClass.name}.${
discoveredMethod.methodName
} {${config.type}} -> ${
// eslint-disable-next-line sonarjs/no-nested-template-literals
config.queueOptions?.channel ? `${config.queueOptions.channel}::` : ''
}${config.exchange}::${config.routingKey}::${config.queue || 'amqpgen'}`;

if (
config.type === 'rpc' &&
!connection.configuration.enableDirectReplyTo
) {
this.logger.warn(
`Direct Reply-To Functionality is disabled. RPC handler ${handlerDisplayName} will not be registered`
);
return;
}

this.logger.log(handlerDisplayName);

return config.type === 'rpc'
? connection.createRpc(handler, config)
: connection.createSubscriber(
handler,
config,
discoveredMethod.methodName
);
}

// eslint-disable-next-line sonarjs/cognitive-complexity
public async onApplicationBootstrap() {
if (RabbitMQModule.bootstrapped) {
Expand Down Expand Up @@ -199,38 +238,28 @@ export class RabbitMQModule
'rmq' // contextType
);

const mergedConfig = {
...config,
...connection.configuration.handlers[config.name || ''],
};
const { exchange, routingKey, queue, queueOptions } = mergedConfig;
const moduleHandlerConfigRaw =
connection.configuration.handlers[config.name || ''];

const handlerDisplayName = `${discoveredMethod.parentClass.name}.${
discoveredMethod.methodName
} {${config.type}} -> ${
// eslint-disable-next-line sonarjs/no-nested-template-literals
queueOptions?.channel ? `${queueOptions.channel}::` : ''
}${exchange}::${routingKey}::${queue || 'amqpgen'}`;

if (
config.type === 'rpc' &&
!connection.configuration.enableDirectReplyTo
) {
this.logger.warn(
`Direct Reply-To Functionality is disabled. RPC handler ${handlerDisplayName} will not be registered`
);
return;
}
const moduleHandlerConfigs = Array.isArray(moduleHandlerConfigRaw)
? moduleHandlerConfigRaw
: [moduleHandlerConfigRaw];

this.logger.log(handlerDisplayName);
await Promise.all(
moduleHandlerConfigs.map((moduleHandlerConfig) => {
const mergedConfig = {
...config,
...moduleHandlerConfig,
};

return config.type === 'rpc'
? connection.createRpc(handler, mergedConfig)
: connection.createSubscriber(
handler,
return this.setupHandler(
connection,
discoveredMethod,
mergedConfig,
discoveredMethod.methodName
handler
);
})
);
})
);
}
Expand Down
Loading