-
Notifications
You must be signed in to change notification settings - Fork 55
/
Copy pathsqs.service.ts
161 lines (137 loc) · 5.19 KB
/
sqs.service.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import { GetQueueAttributesCommand, PurgeQueueCommand, QueueAttributeName, SQSClient } from '@aws-sdk/client-sqs';
import { DiscoveryService } from '@golevelup/nestjs-discovery';
import { Inject, Injectable, Logger, LoggerService, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
import { Consumer, StopOptions } from 'sqs-consumer';
import { Producer } from 'sqs-producer';
import { SQS_CONSUMER_EVENT_HANDLER, SQS_CONSUMER_METHOD, SQS_OPTIONS } from './sqs.constants';
import {
Message,
QueueName,
SqsConsumerEventHandlerMeta,
SqsConsumerMapValues,
SqsMessageHandlerMeta,
SqsOptions,
} from './sqs.types';
@Injectable()
export class SqsService implements OnModuleInit, OnModuleDestroy {
public readonly consumers = new Map<QueueName, SqsConsumerMapValues>();
public readonly producers = new Map<QueueName, Producer>();
private logger: LoggerService;
private globalStopOptions: StopOptions;
public constructor(
@Inject(SQS_OPTIONS) public readonly options: SqsOptions,
private readonly discover: DiscoveryService,
) {}
public async onModuleInit(): Promise<void> {
this.logger = this.options.logger ?? new Logger('SqsService', { timestamp: false });
this.globalStopOptions = this.options.globalStopOptions ?? {};
const messageHandlers =
await this.discover.providerMethodsWithMetaAtKey<SqsMessageHandlerMeta>(SQS_CONSUMER_METHOD);
const eventHandlers =
await this.discover.providerMethodsWithMetaAtKey<SqsConsumerEventHandlerMeta>(SQS_CONSUMER_EVENT_HANDLER);
this.options.consumers?.forEach((options) => {
const { name, stopOptions, ...consumerOptions } = options;
if (this.consumers.has(name)) {
throw new Error(`Consumer already exists: ${name}`);
}
const metadata = messageHandlers.find(({ meta }) => meta.name === name);
if (!metadata) {
this.logger.warn(`No metadata found for: ${name}`);
return;
}
const isBatchHandler = metadata.meta.batch === true;
const consumer = Consumer.create({
...consumerOptions,
...(isBatchHandler
? {
handleMessageBatch: metadata.discoveredMethod.handler.bind(
metadata.discoveredMethod.parentClass.instance,
),
}
: { handleMessage: metadata.discoveredMethod.handler.bind(metadata.discoveredMethod.parentClass.instance) }),
});
const eventsMetadata = eventHandlers.filter(({ meta }) => meta.name === name);
for (const eventMetadata of eventsMetadata) {
if (eventMetadata) {
consumer.addListener(
eventMetadata.meta.eventName,
eventMetadata.discoveredMethod.handler.bind(metadata.discoveredMethod.parentClass.instance),
);
}
}
this.consumers.set(name, { instance: consumer, stopOptions: stopOptions ?? this.globalStopOptions });
});
this.options.producers?.forEach((options) => {
const { name, ...producerOptions } = options;
if (this.producers.has(name)) {
throw new Error(`Producer already exists: ${name}`);
}
const producer = Producer.create(producerOptions);
this.producers.set(name, producer);
});
for (const consumer of this.consumers.values()) {
consumer.instance.start();
}
}
public onModuleDestroy() {
for (const consumer of this.consumers.values()) {
consumer.instance.stop(consumer.stopOptions);
}
}
private getQueueInfo(name: QueueName) {
if (!this.consumers.has(name) && !this.producers.has(name)) {
throw new Error(`Consumer/Producer does not exist: ${name}`);
}
const { sqs, queueUrl } = (this.consumers.get(name)?.instance ?? this.producers.get(name)) as {
sqs: SQSClient;
queueUrl: string;
};
if (!sqs) {
throw new Error('SQS instance does not exist');
}
return {
sqs,
queueUrl,
};
}
public async purgeQueue(name: QueueName) {
const { sqs, queueUrl } = this.getQueueInfo(name);
const command = new PurgeQueueCommand({
QueueUrl: queueUrl,
});
return await sqs.send(command);
}
public async getQueueAttributes(name: QueueName) {
const { sqs, queueUrl } = this.getQueueInfo(name);
const command = new GetQueueAttributesCommand({
QueueUrl: queueUrl,
AttributeNames: ['All'],
});
const response = await sqs.send(command);
return response.Attributes as { [key in QueueAttributeName]: string };
}
public getProducerQueueSize(name: QueueName) {
if (!this.producers.has(name)) {
throw new Error(`Producer does not exist: ${name}`);
}
return this.producers.get(name).queueSize();
}
public send<T = any>(name: QueueName, payload: Message<T> | Message<T>[]) {
if (!this.producers.has(name)) {
throw new Error(`Producer does not exist: ${name}`);
}
const originalMessages = Array.isArray(payload) ? payload : [payload];
const messages = originalMessages.map((message) => {
let body = message.body;
if (typeof body !== 'string') {
body = JSON.stringify(body) as any;
}
return {
...message,
body,
};
});
const producer = this.producers.get(name);
return producer.send(messages as any[]);
}
}