diff --git a/dist/consumer.js b/dist/consumer.js index c579da16..2f4ff975 100644 --- a/dist/consumer.js +++ b/dist/consumer.js @@ -127,13 +127,13 @@ class Consumer extends events_1.EventEmitter { this.freeConcurrentSlots = newFreeConcurrentSlots; } async reportMessageFromBatchFinished(message, error) { - debug('Message from batch has finised'); + debug('Message from batch has finished'); this.freeConcurrentSlots++; try { if (error) throw error; await this.deleteMessage(message); - this.emit('message_processed', message); + this.emit('message_processed', message, this.queueUrl); } catch (err) { this.emitError(err, message); @@ -166,19 +166,19 @@ class Consumer extends events_1.EventEmitter { else { await Promise.all(response.Messages.map(this.processMessage)); } - this.emit('response_processed'); + this.emit('response_processed', this.queueUrl); } else { - this.emit('empty'); + this.emit('empty', this.queueUrl); } } } async processMessage(message) { - this.emit('message_received', message); + this.emit('message_received', message, this.queueUrl); try { await this.executeHandler(message); await this.deleteMessage(message); - this.emit('message_processed', message); + this.emit('message_processed', message, this.queueUrl); } catch (err) { this.emitError(err, message); @@ -261,7 +261,7 @@ class Consumer extends events_1.EventEmitter { } poll() { if (this.stopped) { - this.emit('stopped'); + this.emit('stopped', this.queueUrl); return; } const pollBatchSize = Math.min(this.batchSize, this.freeConcurrentSlots); @@ -310,7 +310,7 @@ class Consumer extends events_1.EventEmitter { } async processMessageBatch(messages) { messages.forEach((message) => { - this.emit('message_received', message); + this.emit('message_received', message, this.queueUrl); }); this.reportNumberOfMessagesReceived(messages.length); const batchUuid = generateUuid(); diff --git a/src/consumer.ts b/src/consumer.ts index 23f2c9e1..ed7705d4 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -208,7 +208,7 @@ export class Consumer extends EventEmitter { } public async reportMessageFromBatchFinished(message: SQSMessage, error: Error): Promise { - debug('Message from batch has finised'); + debug('Message from batch has finished'); this.freeConcurrentSlots++; @@ -216,7 +216,7 @@ export class Consumer extends EventEmitter { if (error) throw error; await this.deleteMessage(message); - this.emit('message_processed', message); + this.emit('message_processed', message, this.queueUrl); } catch (err) { this.emitError(err, message); } @@ -252,20 +252,20 @@ export class Consumer extends EventEmitter { } else { await Promise.all(response.Messages.map(this.processMessage)); } - this.emit('response_processed'); + this.emit('response_processed', this.queueUrl); } else { - this.emit('empty'); + this.emit('empty', this.queueUrl); } } } private async processMessage(message: SQSMessage): Promise { - this.emit('message_received', message); + this.emit('message_received', message, this.queueUrl); try { await this.executeHandler(message); await this.deleteMessage(message); - this.emit('message_processed', message); + this.emit('message_processed', message, this.queueUrl); } catch (err) { this.emitError(err, message); @@ -347,7 +347,7 @@ export class Consumer extends EventEmitter { private poll(): void { if (this.stopped) { - this.emit('stopped'); + this.emit('stopped', this.queueUrl); return; } @@ -402,7 +402,7 @@ export class Consumer extends EventEmitter { private async processMessageBatch(messages: SQSMessage[]): Promise { messages.forEach((message) => { - this.emit('message_received', message); + this.emit('message_received', message, this.queueUrl); }); this.reportNumberOfMessagesReceived(messages.length);