Skip to content

Commit

Permalink
feat(listen): add support for Source in listener
Browse files Browse the repository at this point in the history
Fixes #49
  • Loading branch information
raschan committed Mar 29, 2022
1 parent 8205012 commit f9c563b
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 128 deletions.
20 changes: 15 additions & 5 deletions src/decorator/listen/listen.decorator.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,29 @@ describe('@Listen', () => {

class Test {
@Listen(queueName, {})
public method1() {}
public method1() {
return;
}

@Listen(queueName, listenOptions)
public method2() {}
public method2() {
return;
}

@Listen(queueName, 'test-connection')
public method3() {}
public method3() {
return;
}

@Listen(queueName, listenOptions, 'test-connection')
public method4() {}
public method4() {
return;
}

@Listen(queueName)
public [Symbol.for('foo')]() {}
public [Symbol.for('foo')]() {
return;
}
}

beforeEach(() => {
Expand Down
37 changes: 18 additions & 19 deletions src/decorator/listen/listen.decorator.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import { SetMetadata } from '@nestjs/common';
import { Source } from 'rhea-promise';
import { AMQP_DEFAULT_CONNECTION_TOKEN, QUEUE_LISTEN_METADATA_KEY } from '../../constant';

import { ListenerMetadata } from '../../domain';
import { ListenOptions } from '../../interface';

interface ListenOverload {
(source: string, connection?: string): MethodDecorator;
<T>(source: string, options: ListenOptions<T>, connection?: string): MethodDecorator;
(source: string | Source, connection?: string): MethodDecorator;
<T>(source: string | Source, options: ListenOptions<T>, connection?: string): MethodDecorator;
}

/**
Expand All @@ -21,31 +22,29 @@ interface ListenOverload {
* ```
*
* @param {string} source The name of the queue which will listen to.
* @param {ListenOptions<T>} [listenOptions={}] Options for the queue listening.
* @param {ListenOptions<T>} [optionsOrConnection={}] Options for the queue listening.
* @param {} [connectionName] Name of the connection the queue belongs.
*
* @public
*/
export const Listen: ListenOverload = <T>(
source: string,
listenOptions?: ListenOptions<T> | string,
source: string | Source,
optionsOrConnection?: ListenOptions<T> | string,
connectionName?: string,
): MethodDecorator => {
return (target: Record<string, any>, propertyKey: string | symbol, descriptor: PropertyDescriptor) => {
const metadata = new ListenerMetadata<T>();

const connection = connectionName ?? (typeof listenOptions === 'string' ? (listenOptions as string) : AMQP_DEFAULT_CONNECTION_TOKEN);
const options = typeof listenOptions === 'object' ? listenOptions : {};

metadata.source = source;
metadata.options = options;
metadata.connection = connection;

metadata.targetName = target.constructor.name;
metadata.target = target.constructor;

metadata.callback = descriptor.value;
metadata.callbackName = typeof propertyKey === 'string' ? propertyKey : propertyKey.toString();
const connection = connectionName ?? (typeof optionsOrConnection === 'string' ? optionsOrConnection : AMQP_DEFAULT_CONNECTION_TOKEN);
const options = typeof optionsOrConnection === 'object' ? optionsOrConnection : {};

const metadata = new ListenerMetadata<T>({
source,
options,
connection,
targetName: target.constructor.name,
target: target.constructor,
callback: descriptor.value,
callbackName: typeof propertyKey === 'string' ? propertyKey : propertyKey.toString(),
});

SetMetadata<string, ListenerMetadata<T>>(QUEUE_LISTEN_METADATA_KEY, metadata)(target, propertyKey, descriptor);
};
Expand Down
28 changes: 21 additions & 7 deletions src/domain/listener-metadata.domain.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Source } from 'rhea-promise';
import { ListenOptions } from '../interface';

/* eslint-disable @typescript-eslint/ban-types */
Expand All @@ -9,35 +10,48 @@ export class ListenerMetadata<T> {
/**
* The method that should be executed once the message is transformed (and validated if needed)
*/
public callback: any;
public readonly callback: any;

/**
* Name of the method
*/
public callbackName: string;
public readonly callbackName: string;

/**
* Name of the queue the handler will handle
*/
public source: string;
public readonly source: string | Source;

/**
* ListenOptions provided to the `@Listener` decorator
*/
public options: ListenOptions<T>;
public readonly options: ListenOptions<T>;

/**
* The name of Class the method belongs to
*/
public targetName: string;
public readonly targetName: string;

/**
* The Class the method belongs to
*/
public target: object;
public readonly target: object;

/**
* Connection the listener should be using
*/
public connection: string;
public readonly connection: string;

// istanbul ignore next
constructor(metadata: ListenerMetadata<T>) {
this.connection = metadata?.connection;
this.source = metadata?.source;
this.options = metadata?.options;

this.callback = metadata?.callback;
this.callbackName = metadata?.callbackName;

this.targetName = metadata?.targetName;
this.target = metadata?.target;
}
}
18 changes: 9 additions & 9 deletions src/queue.module.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ describe('QueueModule', () => {
const moduleOptions: QueueModuleOptions = {
connectionUri,
};
const originalModuleProviders = (QueueModule as any).moduleDefinition.providers;
const originalModuleProviders = QueueModule['moduleDefinition'].providers;
let module: TestingModule;

@Injectable()
Expand Down Expand Up @@ -67,13 +67,13 @@ describe('QueueModule', () => {
class TestGlobalFeatureModule {}

afterEach(async () => {
((AMQConnectionOptionsStorage as any).storage as Map<string, any>).clear();
((AMQConnectionStorage as any).storage as Map<string, any>).clear();
AMQConnectionOptionsStorage['storage'].clear();
AMQConnectionStorage['storage'].clear();

await module?.close();

(QueueModule as any).moduleDefinition.imports = [];
(QueueModule as any).moduleDefinition.providers = originalModuleProviders;
QueueModule['moduleDefinition'].imports = [];
QueueModule['moduleDefinition'].providers = originalModuleProviders;
});

describe('forRoot()', () => {
Expand Down Expand Up @@ -178,7 +178,7 @@ describe('QueueModule', () => {

const forFeatureTestService = module.get<TestForFeatureService>(TestForFeatureService);

expect((forFeatureTestService.queueService as any).amqpService.getConnectionOptions()).toEqual(moduleOptions);
expect(forFeatureTestService.queueService['amqpService'].getConnectionOptions()).toEqual(moduleOptions);
});

it('should import as feature module, with module options for connection', async () => {
Expand All @@ -196,7 +196,7 @@ describe('QueueModule', () => {

const forFeatureTestService = module.get<TestForFeatureService>(TestForFeatureService);

expect((forFeatureTestService.queueService as any).amqpService.getConnectionOptions()).toEqual(moduleOptions);
expect(forFeatureTestService.queueService['amqpService'].getConnectionOptions()).toEqual(moduleOptions);
});
});

Expand Down Expand Up @@ -280,8 +280,8 @@ describe('QueueModule', () => {
Test.createTestingModule({
imports: [QueueModule.forRootAsync({})],
});
} catch (e) {
expect(e.message).toBe('Must provide factory, class or existing provider');
} catch (error) {
expect((error as Error).message).toEqual('Must provide factory, class or existing provider');
}
});
});
Expand Down
50 changes: 25 additions & 25 deletions src/service/amqp/amqp.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ describe('AMQPService', () => {
return mockCalls[mockCalls.length - 1];
};

const spyStorageSet = jest.spyOn((AMQConnectionStorage as any).storage, 'set');
const spyStorageGet = jest.spyOn((AMQConnectionStorage as any).storage, 'get');
const spyStorageSet = jest.spyOn(AMQConnectionStorage['storage'], 'set');
const spyStorageGet = jest.spyOn(AMQConnectionStorage['storage'], 'get');

beforeAll(() => {
// mock the Connection constructor
Expand All @@ -53,7 +53,7 @@ describe('AMQPService', () => {
linkOptions: options,
})),
_connection: {
dispatch: () => {},
dispatch: (): undefined => void 0,
},
}));
});
Expand All @@ -65,8 +65,8 @@ describe('AMQPService', () => {
receiverEvents = [];
moduleOptions = { connectionUri };

((AMQConnectionStorage as any).storage as Map<string, any>).clear();
((AMQConnectionOptionsStorage as any).storage as Map<string, any>).clear();
AMQConnectionStorage['storage'].clear();
AMQConnectionOptionsStorage['storage'].clear();
spyStorageSet.mockClear();
spyStorageGet.mockClear();

Expand All @@ -78,8 +78,8 @@ describe('AMQPService', () => {
},
{
provide: AMQP_CLIENT_TOKEN,
useFactory: async moduleOptions => {
connection = await AMQPService.createConnection(moduleOptions);
useFactory: async options => {
connection = await AMQPService.createConnection(options);

return connection;
},
Expand All @@ -105,17 +105,17 @@ describe('AMQPService', () => {
});

it('should create connection', async () => {
const connection = await AMQPService.createConnection({ connectionUri: connectionSecureUri });
const localConnection = await AMQPService.createConnection({ connectionUri: connectionSecureUri });

expect((connection as any).open).toHaveBeenCalled();
expect(localConnection.open).toHaveBeenCalled();
});

it('should create connection with special chars in username and password', async () => {
const username = 'Jörg';
const password = 'Gt|N#R=6$5(TE@rH"Pvc$7a';
const connectionUri = `amqps://${encodeURIComponent(username)}:${encodeURIComponent(password)}@localhost:5672`;
const localConnectionUri = `amqps://${encodeURIComponent(username)}:${encodeURIComponent(password)}@localhost:5672`;

await AMQPService.createConnection({ connectionUri });
await AMQPService.createConnection({ connectionUri: localConnectionUri });

expect(getLastMockCall(Connection as any)[0]).toEqual(expect.objectContaining({ username, password }));
});
Expand Down Expand Up @@ -274,11 +274,11 @@ describe('AMQPService', () => {
});

it('should successfully disconnect', async () => {
const connection = module.get<Connection>(getAMQConnectionToken());
const localConnection = module.get<Connection>(getAMQConnectionToken());

await service.disconnect();

expect(connection.close).toBeCalled();
expect(localConnection.close).toBeCalled();
});

it('should create a sender', async () => {
Expand All @@ -300,15 +300,15 @@ describe('AMQPService', () => {
});

it('should create a receiver', async () => {
await service.createReceiver('queueName', 1, async () => {});
await service.createReceiver('queueName', 1, async () => void 0);

expect(receiverEvents.length).toBeGreaterThan(0);
});

it('should execute receiver events', async () => {
const context = new EventContextMock();
const spy = jest.spyOn(context.receiver, 'address', 'get');
await service.createReceiver('queueName', 1, async () => {});
await service.createReceiver('queueName', 1, async () => void 0);

receiverEvents.forEach(event => event.callback(context));

Expand All @@ -320,7 +320,7 @@ describe('AMQPService', () => {
it('should add credits', async () => {
const context = new EventContextMock();
const addCredits = 10;
await service.createReceiver('queueName', addCredits, async () => {});
await service.createReceiver('queueName', addCredits, async () => void 0);

receiverEvents.forEach(event => event.callback(context));

Expand Down Expand Up @@ -349,7 +349,7 @@ describe('AMQPService', () => {
const connectionName = 'testConnection';

beforeEach(() => {
((AMQConnectionStorage as any).storage as Map<string, any>).clear();
(AMQConnectionStorage['storage'] as Map<string, any>).clear();
spyStorageSet.mockClear();
spyStorageGet.mockClear();

Expand Down Expand Up @@ -389,14 +389,14 @@ describe('AMQPService', () => {
expect(senderEvents.length).toBeGreaterThan(0);
});

it('should throw error while trying to create sender on nonexistant connection', async () => {
it('should throw error while trying to create sender on nonexistent connection', async () => {
await AMQPService.createConnection({ connectionUri: connectionSecureUri }, connectionName);

try {
await service.createSender('testQueue', 'nonExisting');
expect.assertions(1);
} catch (e) {
expect(e.message).toBe('No connection found for name nonExisting');
} catch (error) {
expect((error as Error).message).toBe('No connection found for name nonExisting');
}

expect(spyStorageGet.mock.calls.length).toEqual(1);
Expand All @@ -407,19 +407,19 @@ describe('AMQPService', () => {
it('should create receiver on connection', async () => {
await AMQPService.createConnection({ connectionUri: connectionSecureUri }, connectionName);

await service.createReceiver('testQueue', 1, async () => {}, connectionName);
await service.createReceiver('testQueue', 1, async () => void 0, connectionName);

expect(receiverEvents.length).toBeGreaterThan(0);
});

it('should throw error while trying to create receiver on nonexistant connection', async () => {
it('should throw error while trying to create receiver on nonexistent connection', async () => {
await AMQPService.createConnection({ connectionUri: connectionSecureUri }, connectionName);

try {
await service.createReceiver('testQueue', 1, async () => {}, 'nonExisting');
await service.createReceiver('testQueue', 1, async () => void 0, 'nonExisting');
expect.assertions(1);
} catch (e) {
expect(e.message).toBe('No connection found for name nonExisting');
} catch (error) {
expect((error as Error).message).toBe('No connection found for name nonExisting');
}

expect(spyStorageGet.mock.calls.length).toEqual(1);
Expand Down
Loading

0 comments on commit f9c563b

Please sign in to comment.