From f9c563be7e44b7f38e5225dd3af7738e34446217 Mon Sep 17 00:00:00 2001 From: Marcell Huszti Date: Tue, 29 Mar 2022 16:37:11 +0200 Subject: [PATCH] feat(listen): add support for Source in listener Fixes #49 --- src/decorator/listen/listen.decorator.spec.ts | 20 +++-- src/decorator/listen/listen.decorator.ts | 37 +++++----- src/domain/listener-metadata.domain.ts | 28 +++++-- src/queue.module.spec.ts | 18 ++--- src/service/amqp/amqp.service.spec.ts | 50 ++++++------- src/service/amqp/amqp.service.ts | 11 ++- src/service/queue/queue.service.spec.ts | 73 +++++++++++-------- src/service/queue/queue.service.ts | 57 ++++++++------- 8 files changed, 166 insertions(+), 128 deletions(-) diff --git a/src/decorator/listen/listen.decorator.spec.ts b/src/decorator/listen/listen.decorator.spec.ts index 75ffc3d..1a21f5a 100644 --- a/src/decorator/listen/listen.decorator.spec.ts +++ b/src/decorator/listen/listen.decorator.spec.ts @@ -9,19 +9,29 @@ describe('@Listen', () => { class Test { @Listen(queueName, {}) - public method1() {} + public method1() { + return; + } @Listen(queueName, listenOptions) - public method2() {} + public method2() { + return; + } @Listen(queueName, 'test-connection') - public method3() {} + public method3() { + return; + } @Listen(queueName, listenOptions, 'test-connection') - public method4() {} + public method4() { + return; + } @Listen(queueName) - public [Symbol.for('foo')]() {} + public [Symbol.for('foo')]() { + return; + } } beforeEach(() => { diff --git a/src/decorator/listen/listen.decorator.ts b/src/decorator/listen/listen.decorator.ts index 6b133c8..4aa0782 100644 --- a/src/decorator/listen/listen.decorator.ts +++ b/src/decorator/listen/listen.decorator.ts @@ -1,12 +1,13 @@ import { SetMetadata } from '@nestjs/common'; +import { Source } from 'rhea-promise'; import { AMQP_DEFAULT_CONNECTION_TOKEN, QUEUE_LISTEN_METADATA_KEY } from '../../constant'; import { ListenerMetadata } from '../../domain'; import { ListenOptions } from '../../interface'; interface ListenOverload { - (source: string, connection?: string): MethodDecorator; - (source: string, options: ListenOptions, connection?: string): MethodDecorator; + (source: string | Source, connection?: string): MethodDecorator; + (source: string | Source, options: ListenOptions, connection?: string): MethodDecorator; } /** @@ -21,31 +22,29 @@ interface ListenOverload { * ``` * * @param {string} source The name of the queue which will listen to. - * @param {ListenOptions} [listenOptions={}] Options for the queue listening. + * @param {ListenOptions} [optionsOrConnection={}] Options for the queue listening. * @param {} [connectionName] Name of the connection the queue belongs. * * @public */ export const Listen: ListenOverload = ( - source: string, - listenOptions?: ListenOptions | string, + source: string | Source, + optionsOrConnection?: ListenOptions | string, connectionName?: string, ): MethodDecorator => { return (target: Record, propertyKey: string | symbol, descriptor: PropertyDescriptor) => { - const metadata = new ListenerMetadata(); - - const connection = connectionName ?? (typeof listenOptions === 'string' ? (listenOptions as string) : AMQP_DEFAULT_CONNECTION_TOKEN); - const options = typeof listenOptions === 'object' ? listenOptions : {}; - - metadata.source = source; - metadata.options = options; - metadata.connection = connection; - - metadata.targetName = target.constructor.name; - metadata.target = target.constructor; - - metadata.callback = descriptor.value; - metadata.callbackName = typeof propertyKey === 'string' ? propertyKey : propertyKey.toString(); + const connection = connectionName ?? (typeof optionsOrConnection === 'string' ? optionsOrConnection : AMQP_DEFAULT_CONNECTION_TOKEN); + const options = typeof optionsOrConnection === 'object' ? optionsOrConnection : {}; + + const metadata = new ListenerMetadata({ + source, + options, + connection, + targetName: target.constructor.name, + target: target.constructor, + callback: descriptor.value, + callbackName: typeof propertyKey === 'string' ? propertyKey : propertyKey.toString(), + }); SetMetadata>(QUEUE_LISTEN_METADATA_KEY, metadata)(target, propertyKey, descriptor); }; diff --git a/src/domain/listener-metadata.domain.ts b/src/domain/listener-metadata.domain.ts index 8633af2..e948931 100644 --- a/src/domain/listener-metadata.domain.ts +++ b/src/domain/listener-metadata.domain.ts @@ -1,3 +1,4 @@ +import { Source } from 'rhea-promise'; import { ListenOptions } from '../interface'; /* eslint-disable @typescript-eslint/ban-types */ @@ -9,35 +10,48 @@ export class ListenerMetadata { /** * The method that should be executed once the message is transformed (and validated if needed) */ - public callback: any; + public readonly callback: any; /** * Name of the method */ - public callbackName: string; + public readonly callbackName: string; /** * Name of the queue the handler will handle */ - public source: string; + public readonly source: string | Source; /** * ListenOptions provided to the `@Listener` decorator */ - public options: ListenOptions; + public readonly options: ListenOptions; /** * The name of Class the method belongs to */ - public targetName: string; + public readonly targetName: string; /** * The Class the method belongs to */ - public target: object; + public readonly target: object; /** * Connection the listener should be using */ - public connection: string; + public readonly connection: string; + + // istanbul ignore next + constructor(metadata: ListenerMetadata) { + this.connection = metadata?.connection; + this.source = metadata?.source; + this.options = metadata?.options; + + this.callback = metadata?.callback; + this.callbackName = metadata?.callbackName; + + this.targetName = metadata?.targetName; + this.target = metadata?.target; + } } diff --git a/src/queue.module.spec.ts b/src/queue.module.spec.ts index d04a39d..c1ef11c 100644 --- a/src/queue.module.spec.ts +++ b/src/queue.module.spec.ts @@ -14,7 +14,7 @@ describe('QueueModule', () => { const moduleOptions: QueueModuleOptions = { connectionUri, }; - const originalModuleProviders = (QueueModule as any).moduleDefinition.providers; + const originalModuleProviders = QueueModule['moduleDefinition'].providers; let module: TestingModule; @Injectable() @@ -67,13 +67,13 @@ describe('QueueModule', () => { class TestGlobalFeatureModule {} afterEach(async () => { - ((AMQConnectionOptionsStorage as any).storage as Map).clear(); - ((AMQConnectionStorage as any).storage as Map).clear(); + AMQConnectionOptionsStorage['storage'].clear(); + AMQConnectionStorage['storage'].clear(); await module?.close(); - (QueueModule as any).moduleDefinition.imports = []; - (QueueModule as any).moduleDefinition.providers = originalModuleProviders; + QueueModule['moduleDefinition'].imports = []; + QueueModule['moduleDefinition'].providers = originalModuleProviders; }); describe('forRoot()', () => { @@ -178,7 +178,7 @@ describe('QueueModule', () => { const forFeatureTestService = module.get(TestForFeatureService); - expect((forFeatureTestService.queueService as any).amqpService.getConnectionOptions()).toEqual(moduleOptions); + expect(forFeatureTestService.queueService['amqpService'].getConnectionOptions()).toEqual(moduleOptions); }); it('should import as feature module, with module options for connection', async () => { @@ -196,7 +196,7 @@ describe('QueueModule', () => { const forFeatureTestService = module.get(TestForFeatureService); - expect((forFeatureTestService.queueService as any).amqpService.getConnectionOptions()).toEqual(moduleOptions); + expect(forFeatureTestService.queueService['amqpService'].getConnectionOptions()).toEqual(moduleOptions); }); }); @@ -280,8 +280,8 @@ describe('QueueModule', () => { Test.createTestingModule({ imports: [QueueModule.forRootAsync({})], }); - } catch (e) { - expect(e.message).toBe('Must provide factory, class or existing provider'); + } catch (error) { + expect((error as Error).message).toEqual('Must provide factory, class or existing provider'); } }); }); diff --git a/src/service/amqp/amqp.service.spec.ts b/src/service/amqp/amqp.service.spec.ts index a05142e..db878dd 100644 --- a/src/service/amqp/amqp.service.spec.ts +++ b/src/service/amqp/amqp.service.spec.ts @@ -32,8 +32,8 @@ describe('AMQPService', () => { return mockCalls[mockCalls.length - 1]; }; - const spyStorageSet = jest.spyOn((AMQConnectionStorage as any).storage, 'set'); - const spyStorageGet = jest.spyOn((AMQConnectionStorage as any).storage, 'get'); + const spyStorageSet = jest.spyOn(AMQConnectionStorage['storage'], 'set'); + const spyStorageGet = jest.spyOn(AMQConnectionStorage['storage'], 'get'); beforeAll(() => { // mock the Connection constructor @@ -53,7 +53,7 @@ describe('AMQPService', () => { linkOptions: options, })), _connection: { - dispatch: () => {}, + dispatch: (): undefined => void 0, }, })); }); @@ -65,8 +65,8 @@ describe('AMQPService', () => { receiverEvents = []; moduleOptions = { connectionUri }; - ((AMQConnectionStorage as any).storage as Map).clear(); - ((AMQConnectionOptionsStorage as any).storage as Map).clear(); + AMQConnectionStorage['storage'].clear(); + AMQConnectionOptionsStorage['storage'].clear(); spyStorageSet.mockClear(); spyStorageGet.mockClear(); @@ -78,8 +78,8 @@ describe('AMQPService', () => { }, { provide: AMQP_CLIENT_TOKEN, - useFactory: async moduleOptions => { - connection = await AMQPService.createConnection(moduleOptions); + useFactory: async options => { + connection = await AMQPService.createConnection(options); return connection; }, @@ -105,17 +105,17 @@ describe('AMQPService', () => { }); it('should create connection', async () => { - const connection = await AMQPService.createConnection({ connectionUri: connectionSecureUri }); + const localConnection = await AMQPService.createConnection({ connectionUri: connectionSecureUri }); - expect((connection as any).open).toHaveBeenCalled(); + expect(localConnection.open).toHaveBeenCalled(); }); it('should create connection with special chars in username and password', async () => { const username = 'Jörg'; const password = 'Gt|N#R=6$5(TE@rH"Pvc$7a'; - const connectionUri = `amqps://${encodeURIComponent(username)}:${encodeURIComponent(password)}@localhost:5672`; + const localConnectionUri = `amqps://${encodeURIComponent(username)}:${encodeURIComponent(password)}@localhost:5672`; - await AMQPService.createConnection({ connectionUri }); + await AMQPService.createConnection({ connectionUri: localConnectionUri }); expect(getLastMockCall(Connection as any)[0]).toEqual(expect.objectContaining({ username, password })); }); @@ -274,11 +274,11 @@ describe('AMQPService', () => { }); it('should successfully disconnect', async () => { - const connection = module.get(getAMQConnectionToken()); + const localConnection = module.get(getAMQConnectionToken()); await service.disconnect(); - expect(connection.close).toBeCalled(); + expect(localConnection.close).toBeCalled(); }); it('should create a sender', async () => { @@ -300,7 +300,7 @@ describe('AMQPService', () => { }); it('should create a receiver', async () => { - await service.createReceiver('queueName', 1, async () => {}); + await service.createReceiver('queueName', 1, async () => void 0); expect(receiverEvents.length).toBeGreaterThan(0); }); @@ -308,7 +308,7 @@ describe('AMQPService', () => { it('should execute receiver events', async () => { const context = new EventContextMock(); const spy = jest.spyOn(context.receiver, 'address', 'get'); - await service.createReceiver('queueName', 1, async () => {}); + await service.createReceiver('queueName', 1, async () => void 0); receiverEvents.forEach(event => event.callback(context)); @@ -320,7 +320,7 @@ describe('AMQPService', () => { it('should add credits', async () => { const context = new EventContextMock(); const addCredits = 10; - await service.createReceiver('queueName', addCredits, async () => {}); + await service.createReceiver('queueName', addCredits, async () => void 0); receiverEvents.forEach(event => event.callback(context)); @@ -349,7 +349,7 @@ describe('AMQPService', () => { const connectionName = 'testConnection'; beforeEach(() => { - ((AMQConnectionStorage as any).storage as Map).clear(); + (AMQConnectionStorage['storage'] as Map).clear(); spyStorageSet.mockClear(); spyStorageGet.mockClear(); @@ -389,14 +389,14 @@ describe('AMQPService', () => { expect(senderEvents.length).toBeGreaterThan(0); }); - it('should throw error while trying to create sender on nonexistant connection', async () => { + it('should throw error while trying to create sender on nonexistent connection', async () => { await AMQPService.createConnection({ connectionUri: connectionSecureUri }, connectionName); try { await service.createSender('testQueue', 'nonExisting'); expect.assertions(1); - } catch (e) { - expect(e.message).toBe('No connection found for name nonExisting'); + } catch (error) { + expect((error as Error).message).toBe('No connection found for name nonExisting'); } expect(spyStorageGet.mock.calls.length).toEqual(1); @@ -407,19 +407,19 @@ describe('AMQPService', () => { it('should create receiver on connection', async () => { await AMQPService.createConnection({ connectionUri: connectionSecureUri }, connectionName); - await service.createReceiver('testQueue', 1, async () => {}, connectionName); + await service.createReceiver('testQueue', 1, async () => void 0, connectionName); expect(receiverEvents.length).toBeGreaterThan(0); }); - it('should throw error while trying to create receiver on nonexistant connection', async () => { + it('should throw error while trying to create receiver on nonexistent connection', async () => { await AMQPService.createConnection({ connectionUri: connectionSecureUri }, connectionName); try { - await service.createReceiver('testQueue', 1, async () => {}, 'nonExisting'); + await service.createReceiver('testQueue', 1, async () => void 0, 'nonExisting'); expect.assertions(1); - } catch (e) { - expect(e.message).toBe('No connection found for name nonExisting'); + } catch (error) { + expect((error as Error).message).toBe('No connection found for name nonExisting'); } expect(spyStorageGet.mock.calls.length).toEqual(1); diff --git a/src/service/amqp/amqp.service.ts b/src/service/amqp/amqp.service.ts index 7b2a772..56ec8dd 100644 --- a/src/service/amqp/amqp.service.ts +++ b/src/service/amqp/amqp.service.ts @@ -9,6 +9,7 @@ import { Receiver, ReceiverEvents, SenderEvents, + Source, } from 'rhea-promise'; import { URL } from 'url'; @@ -130,7 +131,7 @@ export class AMQPService { try { await connection.open(); } catch (error) { - logger.error(`connection error: ${error.message}`, error); + logger.error(`connection error: ${(error as Error).message}`, (error as Error).stack); if (throwExceptionOnConnectionError === true) { throw error; @@ -143,8 +144,6 @@ export class AMQPService { return connection; } - constructor() {} - /** * Returns the connection object with which the AMQP connection was created. * @@ -213,7 +212,7 @@ export class AMQPService { /** * Creates a receiver object which will send the message to the given queue. * - * @param {string} queueName Name of the queue. + * @param {string} source Name of the queue. * @param {number} credits How many message can be processed parallel. * @param {function(context: EventContext): Promise} onMessage Function what will be invoked when a message arrives. * @param {string} [connectionName] Name of the connection the receiver is on @@ -221,7 +220,7 @@ export class AMQPService { * @return {Receiver} Receiver. */ public async createReceiver( - queueName: string, + source: string | Source, credits: number, onMessage: (context: EventContext) => Promise, connectionName: string = AMQP_DEFAULT_CONNECTION_TOKEN, @@ -243,7 +242,7 @@ export class AMQPService { const receiver: Receiver = await connection.createReceiver({ onError, onMessage, - source: queueName, + source, autoaccept: false, credit_window: 0, }); diff --git a/src/service/queue/queue.service.spec.ts b/src/service/queue/queue.service.spec.ts index a987cf6..63cedcb 100644 --- a/src/service/queue/queue.service.spec.ts +++ b/src/service/queue/queue.service.spec.ts @@ -1,5 +1,5 @@ import { Test, TestingModule } from '@nestjs/testing'; -import { AwaitableSender, EventContext, Receiver } from 'rhea-promise'; +import { AwaitableSender, EventContext, Receiver, Source, filter } from 'rhea-promise'; import { Expose } from 'class-transformer'; import { IsString } from 'class-validator'; @@ -32,8 +32,8 @@ describe('QueueService', () => { const getReceiver = (service: QueueService, queueName: string, connectionName: string): Receiver => { return (service as any).receivers.get((service as any).getLinkToken(queueName, connectionName)); }; - const getMessageHandler = (amqpService: AMQPService): ((context: EventContext) => Promise) => { - return (amqpService.createReceiver as any).mock.calls[0][2]; + const getMessageHandler = (service: AMQPService): ((context: EventContext) => Promise) => { + return (service.createReceiver as any).mock.calls[0][2]; }; const getInternallyCreatedMessageControl = (): MessageControl => { return (MessageControl as jest.Mock).mock.instances[0]; @@ -81,19 +81,34 @@ describe('QueueService', () => { expect(queueService).toBeDefined(); }); - it('should listen to a queue', async () => { + it('should listen to a queue given by the queue name', async () => { const spy = jest.spyOn(queueService, 'getReceiver' as any); await queueService.listen(defaultQueue, () => void 0, {}); expect(spy).toBeCalled(); + expect(spy.mock.calls[0][0]).toEqual('test'); + }); + + it('should listen to a queue given by a Source object', async () => { + const spy = jest.spyOn(queueService, 'getReceiver' as any); + + const source: Source = { + address: defaultQueue, + filter: filter.selector("(JMSDeliveryMode = 'PERSISTENT') OR (JMSCorrelationID) <> ''"), + }; + + await queueService.listen(source, () => void 0, {}); + + expect(spy).toBeCalled(); + expect(spy.mock.calls[0][0]).toEqual(source); }); describe('receiver', () => { it('should create a receiver', async () => { await queueService.listen(defaultQueue, () => void 0, {}); - expect((queueService as any).receivers.size).toBe(1); + expect(queueService['receivers'].size).toBe(1); }); describe('message handling', () => { @@ -336,25 +351,25 @@ describe('QueueService', () => { }); it('should clear links', async () => { - await (queueService as any).getSender('queueName', AMQP_DEFAULT_CONNECTION_TOKEN); + await queueService['getSender']('queueName', AMQP_DEFAULT_CONNECTION_TOKEN); queueService.clearSenderAndReceiverLinks(); - expect((queueService as any).senders.size).toBe(0); + expect(queueService['senders'].size).toBe(0); }); describe('getReceiver()', () => { it('should create receiver if not exists yet', async () => { - await (queueService as any).getReceiver('queueName', 1, async () => {}, AMQP_DEFAULT_CONNECTION_TOKEN); + await queueService['getReceiver']('queueName', 1, async () => void 0, AMQP_DEFAULT_CONNECTION_TOKEN); - expect((queueService as any).receivers.size).toBe(1); + expect(queueService['receivers'].size).toBe(1); }); it('should not create an existing receiver', async () => { - await (queueService as any).getReceiver('queueName', 1, async () => {}, AMQP_DEFAULT_CONNECTION_TOKEN); - await (queueService as any).getReceiver('queueName', 1, async () => {}, AMQP_DEFAULT_CONNECTION_TOKEN); + await queueService['getReceiver']('queueName', 1, async () => void 0, AMQP_DEFAULT_CONNECTION_TOKEN); + await queueService['getReceiver']('queueName', 1, async () => void 0, AMQP_DEFAULT_CONNECTION_TOKEN); - expect((queueService as any).receivers.size).toBe(1); + expect(queueService['receivers'].size).toBe(1); }); it('should create different receivers for the same queue name but on different connections', async () => { @@ -364,25 +379,25 @@ describe('QueueService', () => { B = 'B', } - await (queueService as any).getReceiver(queue, 1, async () => {}, connection.A); - await (queueService as any).getReceiver(queue, 1, async () => {}, connection.B); + await queueService['getReceiver'](queue, 1, async () => void 0, connection.A); + await queueService['getReceiver'](queue, 1, async () => void 0, connection.B); - expect((queueService as any).receivers.size).toBe(2); + expect(queueService['receivers'].size).toBe(2); }); }); describe('getSender()', () => { it('should create sender if not exists yet', async () => { - await (queueService as any).getSender('queueName', AMQP_DEFAULT_CONNECTION_TOKEN); + await queueService['getSender']('queueName', AMQP_DEFAULT_CONNECTION_TOKEN); - expect((queueService as any).senders.size).toBe(1); + expect(queueService['senders'].size).toBe(1); }); it('should not create an existing sender', async () => { - await (queueService as any).getSender('queueName', AMQP_DEFAULT_CONNECTION_TOKEN); - await (queueService as any).getSender('queueName', AMQP_DEFAULT_CONNECTION_TOKEN); + await queueService['getSender']('queueName', AMQP_DEFAULT_CONNECTION_TOKEN); + await queueService['getSender']('queueName', AMQP_DEFAULT_CONNECTION_TOKEN); - expect((queueService as any).senders.size).toBe(1); + expect(queueService['senders'].size).toBe(1); }); it('should create different senders for the same queue name but on different connections', async () => { @@ -392,50 +407,50 @@ describe('QueueService', () => { B = 'B', } - await (queueService as any).getSender(queue, connection.A); - await (queueService as any).getSender(queue, connection.B); + await queueService['getSender'](queue, connection.A); + await queueService['getSender'](queue, connection.B); - expect((queueService as any).senders.size).toBe(2); + expect(queueService['senders'].size).toBe(2); }); }); it('should encode message', () => { const obj = { name: 'Peter' }; - const result = (queueService as any).encodeMessage(obj); + const result = queueService['encodeMessage'](obj); expect(result).toEqual(JSON.stringify(obj)); }); describe('decodeMessage()', () => { it('should decode Buffer', () => { - const result = (queueService as any).decodeMessage(Buffer.from('{}')); + const result = queueService['decodeMessage'](Buffer.from('{}')); expect(result).toEqual({}); }); it('should with the argument itself if it is an object', () => { const obj = { name: 'Peter' }; - const result = (queueService as any).decodeMessage(obj); + const result = queueService['decodeMessage'](obj); expect(result).toBe(obj); }); it('should decode not object but valid values', () => { - const result = (queueService as any).decodeMessage('false'); + const result = queueService['decodeMessage']('false'); expect(result).toEqual(false); }); it('should decode valid objects', () => { const obj = { a: 1, b: { c: 2 } }; - const result = (queueService as any).decodeMessage(JSON.stringify(obj)); + const result = queueService['decodeMessage'](JSON.stringify(obj)); expect(result).toEqual(obj); }); it('should throw error on invalid objects', () => { expect(() => { - (queueService as any).decodeMessage('{null}'); + queueService['decodeMessage']('{null}'); }).toThrowError(SyntaxError); }); }); diff --git a/src/service/queue/queue.service.ts b/src/service/queue/queue.service.ts index 8005389..6def4de 100644 --- a/src/service/queue/queue.service.ts +++ b/src/service/queue/queue.service.ts @@ -1,5 +1,5 @@ import { Injectable } from '@nestjs/common'; -import { AwaitableSender, Delivery, EventContext, Message, Receiver } from 'rhea-promise'; +import { AwaitableSender, Delivery, EventContext, Message, Receiver, Source } from 'rhea-promise'; import { extendObject, @@ -47,18 +47,20 @@ export class QueueService { * @public */ public async listen( - queueName: string, + source: string | Source, callback: (object: T, control: MessageControl) => Promise, options: ListenOptions, connection: string = AMQP_DEFAULT_CONNECTION_TOKEN, ): Promise { + const sourceToken = typeof source === 'string' ? source : source.address; + // get receiver const initialCredit = !!options && options.parallelMessageProcessing ? options.parallelMessageProcessing : PARALLEL_MESSAGE_COUNT; const transformerOptions = !!options && options.transformerOptions ? options.transformerOptions : {}; const validatorOptions = !!options && options.validatorOptions ? options.validatorOptions : {}; const messageValidator = async (context: EventContext, control: MessageControl) => { - logger.verbose(`incoming message on queue '${queueName}'`); + logger.verbose(`incoming message on queue '${sourceToken}'`); const body: any = context.message.body; let object: T; @@ -66,9 +68,8 @@ export class QueueService { // if not expecting parsed data if (!options || options.type === null || options.type === undefined) { object = null; - } - // if expecting parsed data - else { + } else { + // if expecting parsed data let parsed: any; // parse body received as string from queue @@ -78,7 +79,7 @@ export class QueueService { logger.error('cant decode message', body); // can't decode, need to reject message - control.reject(error.message); + control.reject((error as Error).message); return; } @@ -112,20 +113,20 @@ export class QueueService { // istanbul ignore else if (error instanceof ValidationException) { - logger.error(`validation error ${queueName} (payload: ${JSON.stringify(parsed)}): ${error.message}`, error.stack); + logger.error(`validation error ${sourceToken} (payload: ${JSON.stringify(parsed)}): ${error.message}`, error.stack); } else { - const parsedError = tryParseJSON(error.message) || error.message; + const parsedError = tryParseJSON((error as Error).message) || (error as Error).message; logger.error( - `unexpected error happened during validation process on '${queueName}' (payload: ${JSON.stringify( + `unexpected error happened during validation process on '${sourceToken}' (payload: ${JSON.stringify( parsed, )}): ${parsedError.toString()}`, - error, + (error as Error).stack, ); } // can't validate, need to reject message - control.reject(error.message); + control.reject((error as Error).message); return; } @@ -136,7 +137,7 @@ export class QueueService { const startTime = new Date(); await callback(object, control); const durationInMs = new Date().getTime() - startTime.getTime(); - logger.log(`handling '${queueName}' finished in ${durationInMs} (ms)`); + logger.log(`handling '${sourceToken}' finished in ${durationInMs} (ms)`); // handle auto-accept when message is otherwise not handled // istanbul ignore next @@ -144,13 +145,13 @@ export class QueueService { control.accept(); } } catch (error) { - logger.error(`error in callback on queue '${queueName}': ${error.message}`, error); + logger.error(`error in callback on queue '${sourceToken}': ${(error as Error).message}`, (error as Error).stack); // can't process callback, need to reject message - control.reject(error.message); + control.reject((error as Error).message); } - logger.verbose(`handled message on queue '${queueName}'`); + logger.verbose(`handled message on queue '${sourceToken}'`); }; const messageHandler = async (context: EventContext) => { @@ -161,7 +162,7 @@ export class QueueService { control.reject(error.message); }); }; - await this.getReceiver(queueName, initialCredit, messageHandler, connection); + await this.getReceiver(source, initialCredit, messageHandler, connection); } /** @@ -295,19 +296,21 @@ export class QueueService { } private async getReceiver( - queueName: string, + source: string | Source, credit: number, messageHandler: (context: EventContext) => Promise, connection: string, ): Promise { let receiver; - const receiverToken = this.getLinkToken(queueName, connection); + const sourceToken = typeof source === 'string' ? source : JSON.stringify(source); + + const receiverToken = this.getLinkToken(sourceToken, connection); if (this.receivers.has(receiverToken)) { receiver = this.receivers.get(receiverToken); } else { - receiver = await this.amqpService.createReceiver(queueName, credit, messageHandler.bind(this), connection); + receiver = await this.amqpService.createReceiver(source, credit, messageHandler.bind(this), connection); this.receivers.set(receiverToken, receiver); } @@ -315,15 +318,15 @@ export class QueueService { return receiver; } - private async getSender(queueName: string, connection: string): Promise { + private async getSender(target: string, connection: string): Promise { let sender; - const senderToken = this.getLinkToken(queueName, connection); + const senderToken = this.getLinkToken(target, connection); if (this.senders.has(senderToken)) { sender = this.senders.get(senderToken); } else { - sender = await this.amqpService.createSender(queueName, connection); + sender = await this.amqpService.createSender(target, connection); this.senders.set(senderToken, sender); } @@ -341,13 +344,11 @@ export class QueueService { } const objectLike: string = message instanceof Buffer ? message.toString() : (message as string); - const object = JSON.parse(objectLike); - - return object; + return JSON.parse(objectLike); } - private getLinkToken(queue: string, connection: string): string { - return `${connection}:${queue}`; + private getLinkToken(sourceToken: string, connection: string): string { + return `${connection}:${sourceToken}`; } }