Skip to content

Commit

Permalink
feat(connection-uri): add custom protocols
Browse files Browse the repository at this point in the history
  • Loading branch information
tahubu committed Apr 13, 2021
1 parent 523d279 commit 1816233
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 2 deletions.
22 changes: 22 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,28 @@ To create a connection, you have to set the connection details. The library prov
connection URI. The library will parse this connection URI and set the appropriate connection options. Besides, you can add your custom
connection options or other library settings with the module options.

#### Connection URI

This library provides an easier way to set the connection options with a connection URI: you can describe the connection settings with a
URI. The library will parse the URI and set the corresponding options. Here you can see the description of the URI:
```bash
protocol://[username:password@]host:port
```

The `username` and `password` components are optional, with these you can set the authentication credentials to the message queue server.

You can set custom protocol what will set the connection transport automatically, so you don't have to add the `transport` to the connection
options object. The protocol can be:
* **amqp**: in this case the `transport` will be `tcp`
* **amqps**: in this case the `transport` will be `ssl`
* **amqp+ssl**: in this case the `transport` will be `ssl`
* **amqp+tls**: in this case the `transport` will be `tls`

Examples:
* `amqp://localhost:5672`
* `amqps://user:[email protected]:5672`
* `amqp+tls://admin:[email protected]:5672`

#### Create connection

To create a connection, you have to import the `QueueModule.forRoot()` module into your application's root module. The `forRoot()`
Expand Down
1 change: 1 addition & 0 deletions src/exception/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './nest-amqp-invalid-connection-protocol.exception';
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export class NestAmqpInvalidConnectionProtocolException extends Error {
constructor(message: string) {
super(message);
}
}
34 changes: 34 additions & 0 deletions src/service/amqp/amqp.service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Test, TestingModule } from '@nestjs/testing';
import MockInstance = jest.MockInstance;

jest.mock('rhea-promise');

Expand All @@ -7,6 +8,7 @@ import { Connection, ConnectionEvents, ReceiverEvents, SenderEvents } from 'rhea
import { EventContextMock } from '../../test/event-context.mock';
import { AMQP_CLIENT_TOKEN, QUEUE_MODULE_OPTIONS } from '../../constant';
import { QueueModuleOptions } from '../../interface';
import { NestAmqpInvalidConnectionProtocolException } from '../../exception';

import { AMQPService } from './amqp.service';

Expand All @@ -21,6 +23,10 @@ describe('AMQPService', () => {
let senderEvents: Array<{ event: SenderEvents; callback: (context: any) => any }> = [];
let connectionOpenMock: jest.Mock = jest.fn().mockResolvedValue(null);
const receiverEvents: Array<{ event: ReceiverEvents; callback: (context: any) => any }> = [];
const getLastMockCall = (obj: MockInstance<any, any>) => {
const mockCalls = obj.mock.calls;
return mockCalls[mockCalls.length - 1];
};

beforeAll(() => {
// mock the Connection constructor
Expand Down Expand Up @@ -90,6 +96,34 @@ describe('AMQPService', () => {
await expect(AMQPService.createConnection(null)).rejects.toThrow(/connection options must an object/);
});

describe('connection protocol', () => {
it('should work with amqp:// protocol', async () => {
await AMQPService.createConnection({ connectionUri: 'amqp://localhost:5672' });
expect(getLastMockCall(Connection as any)[0]).toEqual(expect.objectContaining({ transport: 'tcp' }));
});

it('should work with amqps:// protocol', async () => {
await AMQPService.createConnection({ connectionUri: 'amqps://localhost:5672' });
expect(getLastMockCall(Connection as any)[0]).toEqual(expect.objectContaining({ transport: 'ssl' }));
});

it('should work with amqp+ssl:// protocol', async () => {
await AMQPService.createConnection({ connectionUri: 'amqp+ssl://localhost:5672' });
expect(getLastMockCall(Connection as any)[0]).toEqual(expect.objectContaining({ transport: 'ssl' }));
});

it('should work with amqp+tls:// protocol', async () => {
await AMQPService.createConnection({ connectionUri: 'amqp+tls://localhost:5672' });
expect(getLastMockCall(Connection as any)[0]).toEqual(expect.objectContaining({ transport: 'tls' }));
});

it('should throw error on unsupported protocol', async () => {
await expect(AMQPService.createConnection({ connectionUri: 'stomp://localhost:5672' })).rejects.toThrowError(
NestAmqpInvalidConnectionProtocolException,
);
});
});

describe('connection options', () => {
it('should not throw connection error by default', async () => {
connectionOpenMock = jest.fn().mockRejectedValue(new Error('Test'));
Expand Down
34 changes: 32 additions & 2 deletions src/service/amqp/amqp.service.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,23 @@
import { Inject, Injectable } from '@nestjs/common';
import { EventEmitter } from 'events';
import { AwaitableSender, Connection, ConnectionEvents, EventContext, Receiver, ReceiverEvents, SenderEvents } from 'rhea-promise';
import {
AwaitableSender,
Connection,
ConnectionEvents,
ConnectionOptions,
EventContext,
Receiver,
ReceiverEvents,
SenderEvents,
} from 'rhea-promise';
import { URL } from 'url';

import { Logger } from '../../util';
import { QueueModuleOptions } from '../../interface';
import { AMQP_CLIENT_TOKEN, AMQP_CONNECTION_RECONNECT, QUEUE_MODULE_OPTIONS } from '../../constant';
import { NestAmqpInvalidConnectionProtocolException } from '../../exception';

type PropType<TObj, TProp extends keyof TObj> = TObj[TProp];

/**
* Can create a single connection and manage the senders and receivers for it.
Expand Down Expand Up @@ -52,10 +64,28 @@ export class AMQPService {
port,
});

let transport: PropType<ConnectionOptions, 'transport'>;
switch (protocol) {
case 'amqp:':
transport = 'tcp';
break;
case 'amqps:':
transport = 'ssl';
break;
case 'amqp+ssl:':
transport = 'ssl';
break;
case 'amqp+tls:':
transport = 'tls';
break;
default:
throw new NestAmqpInvalidConnectionProtocolException(`Not supported connection protocol: ${protocol}`);
}

const connection = new Connection({
password,
username,
transport: protocol === 'amqps:' ? 'ssl' : 'tcp',
transport,
host: hostname,
port: Number.parseInt(port, 10),
...rheaConnectionOptions,
Expand Down

0 comments on commit 1816233

Please sign in to comment.