Skip to content

Commit

Permalink
Merge pull request #18 from DaPulse/feature/moshik/pass_queue_url_on_…
Browse files Browse the repository at this point in the history
…all_events

add queueUrl to all emitted events data
  • Loading branch information
MoshikEilon authored Apr 10, 2022
2 parents 6efd3b4 + 647e2a2 commit df63e73
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 16 deletions.
16 changes: 8 additions & 8 deletions dist/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,13 @@ class Consumer extends events_1.EventEmitter {
this.freeConcurrentSlots = newFreeConcurrentSlots;
}
async reportMessageFromBatchFinished(message, error) {
debug('Message from batch has finised');
debug('Message from batch has finished');
this.freeConcurrentSlots++;
try {
if (error)
throw error;
await this.deleteMessage(message);
this.emit('message_processed', message);
this.emit('message_processed', message, this.queueUrl);
}
catch (err) {
this.emitError(err, message);
Expand Down Expand Up @@ -166,19 +166,19 @@ class Consumer extends events_1.EventEmitter {
else {
await Promise.all(response.Messages.map(this.processMessage));
}
this.emit('response_processed');
this.emit('response_processed', this.queueUrl);
}
else {
this.emit('empty');
this.emit('empty', this.queueUrl);
}
}
}
async processMessage(message) {
this.emit('message_received', message);
this.emit('message_received', message, this.queueUrl);
try {
await this.executeHandler(message);
await this.deleteMessage(message);
this.emit('message_processed', message);
this.emit('message_processed', message, this.queueUrl);
}
catch (err) {
this.emitError(err, message);
Expand Down Expand Up @@ -261,7 +261,7 @@ class Consumer extends events_1.EventEmitter {
}
poll() {
if (this.stopped) {
this.emit('stopped');
this.emit('stopped', this.queueUrl);
return;
}
const pollBatchSize = Math.min(this.batchSize, this.freeConcurrentSlots);
Expand Down Expand Up @@ -310,7 +310,7 @@ class Consumer extends events_1.EventEmitter {
}
async processMessageBatch(messages) {
messages.forEach((message) => {
this.emit('message_received', message);
this.emit('message_received', message, this.queueUrl);
});
this.reportNumberOfMessagesReceived(messages.length);
const batchUuid = generateUuid();
Expand Down
16 changes: 8 additions & 8 deletions src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,15 +208,15 @@ export class Consumer extends EventEmitter {
}

public async reportMessageFromBatchFinished(message: SQSMessage, error: Error): Promise<void> {
debug('Message from batch has finised');
debug('Message from batch has finished');

this.freeConcurrentSlots++;

try {
if (error) throw error;

await this.deleteMessage(message);
this.emit('message_processed', message);
this.emit('message_processed', message, this.queueUrl);
} catch (err) {
this.emitError(err, message);
}
Expand Down Expand Up @@ -252,20 +252,20 @@ export class Consumer extends EventEmitter {
} else {
await Promise.all(response.Messages.map(this.processMessage));
}
this.emit('response_processed');
this.emit('response_processed', this.queueUrl);
} else {
this.emit('empty');
this.emit('empty', this.queueUrl);
}
}
}

private async processMessage(message: SQSMessage): Promise<void> {
this.emit('message_received', message);
this.emit('message_received', message, this.queueUrl);

try {
await this.executeHandler(message);
await this.deleteMessage(message);
this.emit('message_processed', message);
this.emit('message_processed', message, this.queueUrl);
} catch (err) {
this.emitError(err, message);

Expand Down Expand Up @@ -347,7 +347,7 @@ export class Consumer extends EventEmitter {

private poll(): void {
if (this.stopped) {
this.emit('stopped');
this.emit('stopped', this.queueUrl);
return;
}

Expand Down Expand Up @@ -402,7 +402,7 @@ export class Consumer extends EventEmitter {

private async processMessageBatch(messages: SQSMessage[]): Promise<void> {
messages.forEach((message) => {
this.emit('message_received', message);
this.emit('message_received', message, this.queueUrl);
});

this.reportNumberOfMessagesReceived(messages.length);
Expand Down

0 comments on commit df63e73

Please sign in to comment.