From 2c9bedf219c9186accb67648e004881bc580449e Mon Sep 17 00:00:00 2001 From: Krzysztof Jamrog Date: Fri, 13 Dec 2024 10:58:15 +0100 Subject: [PATCH] Concurrent consumers support (#232) * Allow to specify number of concurrent consumers to create * linting * improved test flow, added concurrentConsumersAmount parameter to README * tests fix * linting * concurrent consumers test for SnsSqsPermissionConsumer * Update README.md Co-authored-by: Igor Savin * Prepare to release @message-queue-toolkit/core 17.2.3 * Prepare to release @message-queue-toolkit/sqs 17.3.0 * Fixed import * Prepare to release @message-queue-toolkit/sns 18.1.1 * Updated sns package version --------- Co-authored-by: Igor Savin --- README.md | 1 + .../consumers/AmqpPermissionConsumer.spec.ts | 2 +- .../core/lib/queues/AbstractQueueService.ts | 16 ++++- packages/core/package.json | 2 +- packages/sns/package.json | 4 +- .../SnsSqsPermissionConsumer.spec.ts | 64 ++++++++++++++++++- .../consumers/SnsSqsPermissionConsumer.ts | 4 ++ packages/sqs/lib/sqs/AbstractSqsConsumer.ts | 52 +++++++++------ packages/sqs/package.json | 2 +- .../consumers/SqsPermissionConsumer.spec.ts | 61 ++++++++++++++++++ .../test/consumers/SqsPermissionConsumer.ts | 8 ++- 11 files changed, 186 insertions(+), 30 deletions(-) diff --git a/README.md b/README.md index fd82b538..c917cc60 100644 --- a/README.md +++ b/README.md @@ -100,6 +100,7 @@ Multi-schema consumers support multiple message types via handler configs. They * `handlerSpy` - allow awaiting certain messages to be published (see [Handler Spies](#handler-spies) for more information); * `logMessages` - add logs for processed messages. * `payloadStoreConfig` - configuration for payload offloading. This option enables the external storage of large message payloads to comply with message size limitations of the queue system. For more details on setting this up, see [Payload Offloading](#payload-offloading). + * `concurrentConsumersAmount` - configuration for specifying the number of concurrent consumers to create. Available only for SQS and SNS consumers * `init()`, prepare consumer for use (e. g. establish all necessary connections); * `close()`, stop listening for messages and disconnect; * `start()`, which invokes `init()`. diff --git a/packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts b/packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts index 469d2eb2..4ca37367 100644 --- a/packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts +++ b/packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts @@ -91,7 +91,7 @@ describe('AmqpPermissionConsumer', () => { await newConsumer.handlerSpy.waitForMessageWithId('1', 'consumed') expect(logger.loggedMessages.length).toBe(5) - expect(logger.loggedMessages).toEqual([ + expect(logger.loggedMessages).toMatchObject([ 'Propagating new connection across 0 receivers', { id: '1', diff --git a/packages/core/lib/queues/AbstractQueueService.ts b/packages/core/lib/queues/AbstractQueueService.ts index b5d4010a..8f55ac9c 100644 --- a/packages/core/lib/queues/AbstractQueueService.ts +++ b/packages/core/lib/queues/AbstractQueueService.ts @@ -165,14 +165,27 @@ export abstract class AbstractQueueService< } protected logProcessedMessage( - _message: MessagePayloadSchemas | null, + message: MessagePayloadSchemas | null, processingResult: MessageProcessingResult, messageId?: string, ) { + const messageTimestamp = message ? this.tryToExtractTimestamp(message) : undefined + const messageProcessingMilliseconds = messageTimestamp + ? Date.now() - messageTimestamp.getTime() + : undefined + + const messageType = + message && this.messageTypeField in message + ? // @ts-ignore + message[this.messageTypeField] + : undefined + this.logger.debug( { processingResult, messageId, + messageProcessingTime: messageProcessingMilliseconds, + messageType, }, `Finished processing message ${messageId ?? `(unknown id)`}`, ) @@ -206,7 +219,6 @@ export abstract class AbstractQueueService< if (this.logMessages) { // @ts-ignore const resolvedMessageId: string | undefined = message?.[this.messageIdField] ?? messageId - this.logProcessedMessage(message, processingResult, resolvedMessageId) } } diff --git a/packages/core/package.json b/packages/core/package.json index 3ff17bb1..642a35a0 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -1,6 +1,6 @@ { "name": "@message-queue-toolkit/core", - "version": "17.2.1", + "version": "17.2.3", "private": false, "license": "MIT", "description": "Useful utilities, interfaces and base classes for message queue handling. Supports AMQP and SQS with a common abstraction on top currently", diff --git a/packages/sns/package.json b/packages/sns/package.json index 5f428cb5..b9f14a1d 100644 --- a/packages/sns/package.json +++ b/packages/sns/package.json @@ -1,6 +1,6 @@ { "name": "@message-queue-toolkit/sns", - "version": "18.0.1", + "version": "18.1.0", "private": false, "license": "MIT", "description": "SNS adapter for message-queue-toolkit", @@ -35,7 +35,7 @@ "@aws-sdk/client-sts": "^3.632.0", "@message-queue-toolkit/core": ">=15.0.0", "@message-queue-toolkit/schemas": ">=2.0.0", - "@message-queue-toolkit/sqs": "^17.0.0" + "@message-queue-toolkit/sqs": "^17.3.0" }, "devDependencies": { "@aws-sdk/client-s3": "^3.670.0", diff --git a/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts b/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts index 6fabbcf3..1200dbcf 100644 --- a/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts +++ b/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts @@ -3,8 +3,8 @@ import { ListTagsForResourceCommand, type SNSClient } from '@aws-sdk/client-sns' import { ListQueueTagsCommand, type SQSClient } from '@aws-sdk/client-sqs' import { waitAndRetry } from '@lokalise/node-core' import { assertQueue, deleteQueue, getQueueAttributes } from '@message-queue-toolkit/sqs' -import { type AwilixContainer, asValue } from 'awilix' -import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest' +import { type AwilixContainer, asFunction, asValue } from 'awilix' +import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest' import { assertTopic, deleteTopic } from '../../lib/utils/snsUtils' import { SnsPermissionPublisher } from '../publishers/SnsPermissionPublisher' @@ -13,6 +13,7 @@ import type { Dependencies } from '../utils/testContext' import type { STSClient } from '@aws-sdk/client-sts' import { SnsSqsPermissionConsumer } from './SnsSqsPermissionConsumer' +import type { PERMISSIONS_ADD_MESSAGE_TYPE } from './userConsumerSchemas' describe('SnsSqsPermissionConsumer', () => { describe('init', () => { @@ -706,6 +707,65 @@ describe('SnsSqsPermissionConsumer', () => { }) }) + describe('multiple consumers', () => { + let diContainer: AwilixContainer + + let publisher: SnsPermissionPublisher + let consumer: SnsSqsPermissionConsumer + + beforeEach(async () => { + diContainer = await registerDependencies({ + permissionConsumer: asFunction((dependencies) => { + return new SnsSqsPermissionConsumer(dependencies, { + creationConfig: { + topic: { + Name: SnsSqsPermissionConsumer.SUBSCRIBED_TOPIC_NAME, + }, + queue: { + QueueName: SnsSqsPermissionConsumer.CONSUMED_QUEUE_NAME, + }, + updateAttributesIfExists: true, + }, + deletionConfig: { + deleteIfExists: true, + }, + concurrentConsumersAmount: 10, + }) + }), + }) + publisher = diContainer.cradle.permissionPublisher + consumer = diContainer.cradle.permissionConsumer + + await consumer.start() + }) + + afterEach(async () => { + await diContainer.cradle.awilixManager.executeDispose() + await diContainer.dispose() + }) + + it('process all messages properly', async () => { + const messagesAmount = 50 + const messages: PERMISSIONS_ADD_MESSAGE_TYPE[] = Array.from({ length: messagesAmount }).map( + (_, i) => ({ + id: `${i}`, + messageType: 'add', + timestamp: new Date().toISOString(), + }), + ) + + messages.map((m) => publisher.publish(m)) + await Promise.all( + messages.map((m) => consumer.handlerSpy.waitForMessageWithId(m.id, 'consumed')), + ) + + // Verifies that each message is executed only once + expect(consumer.addCounter).toBe(messagesAmount) + // Verifies that no message is lost + expect(consumer.processedMessagesIds).toHaveLength(messagesAmount) + }) + }) + describe('visibility timeout', () => { const topicName = 'myTestTopic' const queueName = 'myTestQueue' diff --git a/packages/sns/test/consumers/SnsSqsPermissionConsumer.ts b/packages/sns/test/consumers/SnsSqsPermissionConsumer.ts index 3a24cc51..4487ffb4 100644 --- a/packages/sns/test/consumers/SnsSqsPermissionConsumer.ts +++ b/packages/sns/test/consumers/SnsSqsPermissionConsumer.ts @@ -39,6 +39,7 @@ type SnsSqsPermissionConsumerOptions = Pick< | 'consumerOverrides' | 'maxRetryDuration' | 'payloadStoreConfig' + | 'concurrentConsumersAmount' > & { addPreHandlerBarrier?: ( message: SupportedMessages, @@ -65,6 +66,7 @@ export class SnsSqsPermissionConsumer extends AbstractSnsSqsConsumer< public addBarrierCounter = 0 public removeCounter = 0 public preHandlerCounter = 0 + public processedMessagesIds: Set = new Set() constructor( dependencies: SNSSQSConsumerDependencies, @@ -101,6 +103,7 @@ export class SnsSqsPermissionConsumer extends AbstractSnsSqsConsumer< PERMISSIONS_ADD_MESSAGE_SCHEMA, (_message, context, _preHandlingOutputs) => { this.addCounter += context.incrementAmount + this.processedMessagesIds.add(_message.id) return Promise.resolve({ result: 'success' }) }, { @@ -164,6 +167,7 @@ export class SnsSqsPermissionConsumer extends AbstractSnsSqsConsumer< updateAttributesIfExists: false, }, maxRetryDuration: options.maxRetryDuration, + concurrentConsumersAmount: options.concurrentConsumersAmount, }, { incrementAmount: 1, diff --git a/packages/sqs/lib/sqs/AbstractSqsConsumer.ts b/packages/sqs/lib/sqs/AbstractSqsConsumer.ts index 0d02fdfc..256efdce 100644 --- a/packages/sqs/lib/sqs/AbstractSqsConsumer.ts +++ b/packages/sqs/lib/sqs/AbstractSqsConsumer.ts @@ -63,6 +63,7 @@ export type SQSConsumerOptions< ConsumerOptions, 'sqs' | 'queueUrl' | 'handler' | 'handleMessageBatch' | 'visibilityTimeout' > + concurrentConsumersAmount?: number } export abstract class AbstractSqsConsumer< @@ -96,7 +97,8 @@ export abstract class AbstractSqsConsumer< > implements QueueConsumer { - private consumer?: Consumer + private consumers: Consumer[] + private readonly concurrentConsumersAmount: number private readonly transactionObservabilityManager?: TransactionObservabilityManager private readonly consumerOptionsOverride: Partial private readonly handlerContainer: HandlerContainer< @@ -129,7 +131,8 @@ export abstract class AbstractSqsConsumer< this.deadLetterQueueOptions = options.deadLetterQueue this.maxRetryDuration = options.maxRetryDuration ?? DEFAULT_MAX_RETRY_DURATION this.executionContext = executionContext - + this.consumers = [] + this.concurrentConsumersAmount = options.concurrentConsumersAmount ?? 1 this._messageSchemaContainer = this.resolveConsumerMessageSchemaContainer(options) this.handlerContainer = new HandlerContainer< MessagePayloadType, @@ -174,14 +177,34 @@ export abstract class AbstractSqsConsumer< public async start() { await this.init() - if (this.consumer) this.consumer.stop() + this.stopExistingConsumers() const visibilityTimeout = await this.getQueueVisibilityTimeout() - this.consumer = Consumer.create({ + this.consumers = Array.from({ length: this.concurrentConsumersAmount }).map((_) => + this.createConsumer({ visibilityTimeout }), + ) + + for (const consumer of this.consumers) { + consumer.on('error', (err) => { + this.handleError(err, { + queueName: this.queueName, + }) + }) + consumer.start() + } + } + + public override async close(abort?: boolean): Promise { + await super.close() + this.stopExistingConsumers(abort ?? false) + } + + private createConsumer(options: { visibilityTimeout: number | undefined }): Consumer { + return Consumer.create({ sqs: this.sqsClient, queueUrl: this.queueUrl, - visibilityTimeout, + visibilityTimeout: options.visibilityTimeout, messageAttributeNames: [`${PAYLOAD_OFFLOADING_ATTRIBUTE_PREFIX}*`], ...this.consumerOptionsOverride, handleMessage: async (message: SQSMessage) => { @@ -250,21 +273,14 @@ export abstract class AbstractSqsConsumer< return Promise.reject(result.error) }, }) - - this.consumer.on('error', (err) => { - this.handleError(err, { - queueName: this.queueName, - }) - }) - - this.consumer.start() } - public override async close(abort?: boolean): Promise { - await super.close() - this.consumer?.stop({ - abort: abort ?? false, - }) + private stopExistingConsumers(abort?: boolean) { + for (const consumer of this.consumers) { + consumer.stop({ + abort, + }) + } } private async internalProcessMessage( diff --git a/packages/sqs/package.json b/packages/sqs/package.json index 6f9eb69e..8f8bea5d 100644 --- a/packages/sqs/package.json +++ b/packages/sqs/package.json @@ -1,6 +1,6 @@ { "name": "@message-queue-toolkit/sqs", - "version": "17.2.0", + "version": "17.3.0", "private": false, "license": "MIT", "description": "SQS adapter for message-queue-toolkit", diff --git a/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts b/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts index 1fca15c4..90462946 100644 --- a/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts +++ b/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts @@ -21,6 +21,7 @@ import { SINGLETON_CONFIG, registerDependencies } from '../utils/testContext' import type { Dependencies } from '../utils/testContext' import { SqsPermissionConsumer } from './SqsPermissionConsumer' +import type { PERMISSIONS_ADD_MESSAGE_TYPE } from './userConsumerSchemas' describe('SqsPermissionConsumer', () => { describe('init', () => { @@ -596,6 +597,66 @@ describe('SqsPermissionConsumer', () => { }) }) + describe('multiple consumers', () => { + let diContainer: AwilixContainer + let sqsClient: SQSClient + + let publisher: SqsPermissionPublisher + let consumer: SqsPermissionConsumer + + beforeEach(async () => { + diContainer = await registerDependencies({ + permissionConsumer: asFunction((dependencies) => { + return new SqsPermissionConsumer(dependencies, { + creationConfig: { + queue: { + QueueName: SqsPermissionConsumer.QUEUE_NAME, + }, + }, + concurrentConsumersAmount: 5, + }) + }), + }) + sqsClient = diContainer.cradle.sqsClient + publisher = diContainer.cradle.permissionPublisher + consumer = diContainer.cradle.permissionConsumer + + await consumer.start() + + const command = new ReceiveMessageCommand({ + QueueUrl: publisher.queueProps.url, + }) + const reply = await sqsClient.send(command) + expect(reply.Messages).toBeUndefined() + }) + + afterEach(async () => { + await diContainer.cradle.awilixManager.executeDispose() + await diContainer.dispose() + }) + + it('process all messages properly', async () => { + const messagesAmount = 100 + const messages: PERMISSIONS_ADD_MESSAGE_TYPE[] = Array.from({ length: messagesAmount }).map( + (_, i) => ({ + id: `${i}`, + messageType: 'add', + timestamp: new Date().toISOString(), + }), + ) + + messages.map((m) => publisher.publish(m)) + await Promise.all( + messages.map((m) => consumer.handlerSpy.waitForMessageWithId(m.id, 'consumed')), + ) + + // Verifies that each message is executed only once + expect(consumer.addCounter).toBe(messagesAmount) + // Verifies that no message is lost + expect(consumer.processedMessagesIds).toHaveLength(messagesAmount) + }) + }) + describe('visibility timeout', () => { const queueName = 'myTestQueue' let diContainer: AwilixContainer diff --git a/packages/sqs/test/consumers/SqsPermissionConsumer.ts b/packages/sqs/test/consumers/SqsPermissionConsumer.ts index 1385e8f5..6d926c9a 100644 --- a/packages/sqs/test/consumers/SqsPermissionConsumer.ts +++ b/packages/sqs/test/consumers/SqsPermissionConsumer.ts @@ -38,6 +38,7 @@ type SqsPermissionConsumerOptions = Pick< preHandlingOutputs: PreHandlingOutputs, ) => Promise> removePreHandlers?: Prehandler[] + concurrentConsumersAmount?: number } type ExecutionContext = { @@ -54,6 +55,7 @@ export class SqsPermissionConsumer extends AbstractSqsConsumer< > { public addCounter = 0 public removeCounter = 0 + public processedMessagesIds: Set = new Set() public static readonly QUEUE_NAME = 'user_permissions_multi' constructor( @@ -97,6 +99,7 @@ export class SqsPermissionConsumer extends AbstractSqsConsumer< consumerOverrides: options.consumerOverrides ?? { terminateVisibilityTimeout: true, // this allows to retry failed messages immediately }, + concurrentConsumersAmount: options.concurrentConsumersAmount, maxRetryDuration: options.maxRetryDuration, payloadStoreConfig: options.payloadStoreConfig, handlers: new MessageHandlerConfigBuilder< @@ -111,9 +114,8 @@ export class SqsPermissionConsumer extends AbstractSqsConsumer< return Promise.resolve({ error: 'retryLater' }) } this.addCounter += context.incrementAmount - return Promise.resolve({ - result: 'success', - }) + this.processedMessagesIds.add(_message.id) + return Promise.resolve({ result: 'success' }) }, { preHandlerBarrier: options.addPreHandlerBarrier,