From 1816233aacee749e958703073ecacd443b9ed1cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tam=C3=A1s=20Hugy=C3=A1k?= Date: Tue, 13 Apr 2021 17:07:07 +0200 Subject: [PATCH] feat(connection-uri): add custom protocols --- README.md | 22 ++++++++++++ src/exception/index.ts | 1 + ...p-invalid-connection-protocol.exception.ts | 5 +++ src/service/amqp/amqp.service.spec.ts | 34 +++++++++++++++++++ src/service/amqp/amqp.service.ts | 34 +++++++++++++++++-- 5 files changed, 94 insertions(+), 2 deletions(-) create mode 100644 src/exception/index.ts create mode 100644 src/exception/nest-amqp-invalid-connection-protocol.exception.ts diff --git a/README.md b/README.md index 315d7dd..6931c6a 100644 --- a/README.md +++ b/README.md @@ -38,6 +38,28 @@ To create a connection, you have to set the connection details. The library prov connection URI. The library will parse this connection URI and set the appropriate connection options. Besides, you can add your custom connection options or other library settings with the module options. +#### Connection URI + +This library provides an easier way to set the connection options with a connection URI: you can describe the connection settings with a +URI. The library will parse the URI and set the corresponding options. Here you can see the description of the URI: +```bash +protocol://[username:password@]host:port +``` + +The `username` and `password` components are optional, with these you can set the authentication credentials to the message queue server. + +You can set custom protocol what will set the connection transport automatically, so you don't have to add the `transport` to the connection +options object. The protocol can be: +* **amqp**: in this case the `transport` will be `tcp` +* **amqps**: in this case the `transport` will be `ssl` +* **amqp+ssl**: in this case the `transport` will be `ssl` +* **amqp+tls**: in this case the `transport` will be `tls` + +Examples: +* `amqp://localhost:5672` +* `amqps://user:password@my-server.com:5672` +* `amqp+tls://admin:secret@127.0.0.1:5672` + #### Create connection To create a connection, you have to import the `QueueModule.forRoot()` module into your application's root module. The `forRoot()` diff --git a/src/exception/index.ts b/src/exception/index.ts new file mode 100644 index 0000000..143dfd1 --- /dev/null +++ b/src/exception/index.ts @@ -0,0 +1 @@ +export * from './nest-amqp-invalid-connection-protocol.exception'; diff --git a/src/exception/nest-amqp-invalid-connection-protocol.exception.ts b/src/exception/nest-amqp-invalid-connection-protocol.exception.ts new file mode 100644 index 0000000..09326de --- /dev/null +++ b/src/exception/nest-amqp-invalid-connection-protocol.exception.ts @@ -0,0 +1,5 @@ +export class NestAmqpInvalidConnectionProtocolException extends Error { + constructor(message: string) { + super(message); + } +} diff --git a/src/service/amqp/amqp.service.spec.ts b/src/service/amqp/amqp.service.spec.ts index 6e4dc55..1279588 100644 --- a/src/service/amqp/amqp.service.spec.ts +++ b/src/service/amqp/amqp.service.spec.ts @@ -1,4 +1,5 @@ import { Test, TestingModule } from '@nestjs/testing'; +import MockInstance = jest.MockInstance; jest.mock('rhea-promise'); @@ -7,6 +8,7 @@ import { Connection, ConnectionEvents, ReceiverEvents, SenderEvents } from 'rhea import { EventContextMock } from '../../test/event-context.mock'; import { AMQP_CLIENT_TOKEN, QUEUE_MODULE_OPTIONS } from '../../constant'; import { QueueModuleOptions } from '../../interface'; +import { NestAmqpInvalidConnectionProtocolException } from '../../exception'; import { AMQPService } from './amqp.service'; @@ -21,6 +23,10 @@ describe('AMQPService', () => { let senderEvents: Array<{ event: SenderEvents; callback: (context: any) => any }> = []; let connectionOpenMock: jest.Mock = jest.fn().mockResolvedValue(null); const receiverEvents: Array<{ event: ReceiverEvents; callback: (context: any) => any }> = []; + const getLastMockCall = (obj: MockInstance) => { + const mockCalls = obj.mock.calls; + return mockCalls[mockCalls.length - 1]; + }; beforeAll(() => { // mock the Connection constructor @@ -90,6 +96,34 @@ describe('AMQPService', () => { await expect(AMQPService.createConnection(null)).rejects.toThrow(/connection options must an object/); }); + describe('connection protocol', () => { + it('should work with amqp:// protocol', async () => { + await AMQPService.createConnection({ connectionUri: 'amqp://localhost:5672' }); + expect(getLastMockCall(Connection as any)[0]).toEqual(expect.objectContaining({ transport: 'tcp' })); + }); + + it('should work with amqps:// protocol', async () => { + await AMQPService.createConnection({ connectionUri: 'amqps://localhost:5672' }); + expect(getLastMockCall(Connection as any)[0]).toEqual(expect.objectContaining({ transport: 'ssl' })); + }); + + it('should work with amqp+ssl:// protocol', async () => { + await AMQPService.createConnection({ connectionUri: 'amqp+ssl://localhost:5672' }); + expect(getLastMockCall(Connection as any)[0]).toEqual(expect.objectContaining({ transport: 'ssl' })); + }); + + it('should work with amqp+tls:// protocol', async () => { + await AMQPService.createConnection({ connectionUri: 'amqp+tls://localhost:5672' }); + expect(getLastMockCall(Connection as any)[0]).toEqual(expect.objectContaining({ transport: 'tls' })); + }); + + it('should throw error on unsupported protocol', async () => { + await expect(AMQPService.createConnection({ connectionUri: 'stomp://localhost:5672' })).rejects.toThrowError( + NestAmqpInvalidConnectionProtocolException, + ); + }); + }); + describe('connection options', () => { it('should not throw connection error by default', async () => { connectionOpenMock = jest.fn().mockRejectedValue(new Error('Test')); diff --git a/src/service/amqp/amqp.service.ts b/src/service/amqp/amqp.service.ts index 023ee1a..34cb0c2 100644 --- a/src/service/amqp/amqp.service.ts +++ b/src/service/amqp/amqp.service.ts @@ -1,11 +1,23 @@ import { Inject, Injectable } from '@nestjs/common'; import { EventEmitter } from 'events'; -import { AwaitableSender, Connection, ConnectionEvents, EventContext, Receiver, ReceiverEvents, SenderEvents } from 'rhea-promise'; +import { + AwaitableSender, + Connection, + ConnectionEvents, + ConnectionOptions, + EventContext, + Receiver, + ReceiverEvents, + SenderEvents, +} from 'rhea-promise'; import { URL } from 'url'; import { Logger } from '../../util'; import { QueueModuleOptions } from '../../interface'; import { AMQP_CLIENT_TOKEN, AMQP_CONNECTION_RECONNECT, QUEUE_MODULE_OPTIONS } from '../../constant'; +import { NestAmqpInvalidConnectionProtocolException } from '../../exception'; + +type PropType = TObj[TProp]; /** * Can create a single connection and manage the senders and receivers for it. @@ -52,10 +64,28 @@ export class AMQPService { port, }); + let transport: PropType; + switch (protocol) { + case 'amqp:': + transport = 'tcp'; + break; + case 'amqps:': + transport = 'ssl'; + break; + case 'amqp+ssl:': + transport = 'ssl'; + break; + case 'amqp+tls:': + transport = 'tls'; + break; + default: + throw new NestAmqpInvalidConnectionProtocolException(`Not supported connection protocol: ${protocol}`); + } + const connection = new Connection({ password, username, - transport: protocol === 'amqps:' ? 'ssl' : 'tcp', + transport, host: hostname, port: Number.parseInt(port, 10), ...rheaConnectionOptions,