From fd4052031c693244811f4c9310e28aa9127cd9d9 Mon Sep 17 00:00:00 2001 From: CarlosGamero <101278162+CarlosGamero@users.noreply.github.com> Date: Tue, 23 Apr 2024 17:29:32 +0200 Subject: [PATCH] Visibility timeout heartbeat (#124) * TDD SQS visibility timeout * Improving test to be reliable * Fixing exposed parameters on consumerOverrides * Setting consumer visibilityTimeout * Preparing test * node-core update * Minor fix * Adding test for heartbeatinterval * Adjusting type * Removing comment * SQS major + test * SNS supporting heartbeatInterval + tests * SNS major * Minor improvement * Using asValue on tests * Improving visibilityTimeout readability * Using Omit on consumerOverrides * Introducing getQueueVisibilityTimeout * Improving tests * Using setTimeout --- packages/sns/package.json | 6 +- .../SnsSqsPermissionConsumer.spec.ts | 78 +++++++++++++++++- .../consumers/SnsSqsPermissionConsumer.ts | 8 +- packages/sqs/lib/sqs/AbstractSqsConsumer.ts | 40 ++++++++-- packages/sqs/package.json | 4 +- .../consumers/SqsPermissionConsumer.spec.ts | 80 ++++++++++++++++++- .../test/consumers/SqsPermissionConsumer.ts | 9 ++- 7 files changed, 203 insertions(+), 22 deletions(-) diff --git a/packages/sns/package.json b/packages/sns/package.json index 4cebc74d..1d6ace57 100644 --- a/packages/sns/package.json +++ b/packages/sns/package.json @@ -1,6 +1,6 @@ { "name": "@message-queue-toolkit/sns", - "version": "12.1.1", + "version": "13.0.0", "private": false, "license": "MIT", "description": "SNS adapter for message-queue-toolkit", @@ -25,7 +25,7 @@ "prepublishOnly": "npm run build:release" }, "dependencies": { - "@lokalise/node-core": "^9.10.1", + "@lokalise/node-core": "^9.14.0", "sqs-consumer": "^9.1.0", "zod": "^3.22.4" }, @@ -33,7 +33,7 @@ "@aws-sdk/client-sns": "^3.476.0", "@aws-sdk/client-sqs": "^3.476.0", "@message-queue-toolkit/core": "^10.1.1", - "@message-queue-toolkit/sqs": "^12.1.1" + "@message-queue-toolkit/sqs": "^13.0.0" }, "devDependencies": { "@aws-sdk/client-sns": "^3.529.1", diff --git a/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts b/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts index e1815bfa..85c25e41 100644 --- a/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts +++ b/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts @@ -1,11 +1,15 @@ +import { setTimeout } from 'node:timers/promises' + import type { SNSClient } from '@aws-sdk/client-sns' import type { SQSClient } from '@aws-sdk/client-sqs' +import { waitAndRetry } from '@lokalise/node-core' import { assertQueue, deleteQueue, getQueueAttributes } from '@message-queue-toolkit/sqs' import type { AwilixContainer } from 'awilix' +import { asValue } from 'awilix' import { describe, beforeEach, afterEach, expect, it, beforeAll } from 'vitest' import { assertTopic, deleteTopic } from '../../lib/utils/snsUtils' -import type { SnsPermissionPublisher } from '../publishers/SnsPermissionPublisher' +import { SnsPermissionPublisher } from '../publishers/SnsPermissionPublisher' import { registerDependencies } from '../utils/testContext' import type { Dependencies } from '../utils/testContext' @@ -88,6 +92,7 @@ describe('SnsSqsPermissionConsumer', () => { QueueName: 'existingQueue', Attributes: { KmsMasterKeyId: 'othervalue', + VisibilityTimeout: '10', }, }, updateAttributesIfExists: true, @@ -106,7 +111,10 @@ describe('SnsSqsPermissionConsumer', () => { queueUrl: newConsumer.subscriptionProps.queueUrl, }) - expect(attributes.result?.attributes!.KmsMasterKeyId).toBe('othervalue') + expect(attributes.result?.attributes).toMatchObject({ + KmsMasterKeyId: 'othervalue', + VisibilityTimeout: '10', + }) }) it('updates existing queue when one with different attributes exist and sets the policy', async () => { @@ -435,4 +443,70 @@ describe('SnsSqsPermissionConsumer', () => { }) }) }) + + describe('visibility timeout', () => { + const topicName = 'myTestTopic' + const queueName = 'myTestQueue' + let diContainer: AwilixContainer + + beforeEach(async () => { + diContainer = await registerDependencies({ + permissionConsumer: asValue(() => undefined), + permissionPublisher: asValue(() => undefined), + }) + }) + + afterEach(async () => { + await diContainer.cradle.awilixManager.executeDispose() + await diContainer.dispose() + }) + + it.each([false, true])('using 2 consumers with heartbeat -> %s', async (heartbeatEnabled) => { + let consumer1IsProcessing = false + let consumer1Counter = 0 + let consumer2Counter = 0 + + const consumer1 = new SnsSqsPermissionConsumer(diContainer.cradle, { + creationConfig: { + topic: { Name: topicName }, + queue: { QueueName: queueName, Attributes: { VisibilityTimeout: '2' } }, + }, + consumerOverrides: { heartbeatInterval: heartbeatEnabled ? 1 : undefined }, + removeHandlerOverride: async () => { + consumer1IsProcessing = true + await setTimeout(2800) // Wait to the visibility timeout to expire + consumer1Counter++ + consumer1IsProcessing = false + return { result: 'success' } + }, + }) + await consumer1.start() + + const consumer2 = new SnsSqsPermissionConsumer(diContainer.cradle, { + locatorConfig: { + queueUrl: consumer1.subscriptionProps.queueUrl, + topicArn: consumer1.subscriptionProps.topicArn, + subscriptionArn: consumer1.subscriptionProps.subscriptionArn, + }, + removeHandlerOverride: async () => { + consumer2Counter++ + return { result: 'success' } + }, + }) + const publisher = new SnsPermissionPublisher(diContainer.cradle, { + locatorConfig: { topicArn: consumer1.subscriptionProps.topicArn }, + }) + + await publisher.publish({ id: '10', messageType: 'remove' }) + // wait for consumer1 to start processing to start second consumer + await waitAndRetry(() => consumer1IsProcessing, 5, 5) + await consumer2.start() + + // wait for both consumers to process message + await waitAndRetry(() => consumer1Counter > 0 && consumer2Counter > 0, 100, 40) + + expect(consumer1Counter).toBe(1) + expect(consumer2Counter).toBe(heartbeatEnabled ? 0 : 1) + }) + }) }) diff --git a/packages/sns/test/consumers/SnsSqsPermissionConsumer.ts b/packages/sns/test/consumers/SnsSqsPermissionConsumer.ts index 8927aa41..599c5d4d 100644 --- a/packages/sns/test/consumers/SnsSqsPermissionConsumer.ts +++ b/packages/sns/test/consumers/SnsSqsPermissionConsumer.ts @@ -32,7 +32,7 @@ type PrehandlerOutput = { type SnsSqsPermissionConsumerOptions = Pick< SNSSQSConsumerOptions, - 'creationConfig' | 'locatorConfig' | 'deletionConfig' | 'deadLetterQueue' + 'creationConfig' | 'locatorConfig' | 'deletionConfig' | 'deadLetterQueue' | 'consumerOverrides' > & { addPreHandlerBarrier?: ( message: SupportedMessages, @@ -143,6 +143,9 @@ export class SnsSqsPermissionConsumer extends AbstractSnsSqsConsumer< deletionConfig: options.deletionConfig ?? { deleteIfExists: true, }, + consumerOverrides: options.consumerOverrides ?? { + terminateVisibilityTimeout: true, // this allows to retry failed messages immediately + }, deadLetterQueue: options.deadLetterQueue, ...(options.locatorConfig ? { locatorConfig: options.locatorConfig } @@ -153,9 +156,6 @@ export class SnsSqsPermissionConsumer extends AbstractSnsSqsConsumer< }, }), messageTypeField: 'messageType', - consumerOverrides: { - terminateVisibilityTimeout: true, // this allows to retry failed messages immediately - }, subscriptionConfig: { updateAttributesIfExists: false, }, diff --git a/packages/sqs/lib/sqs/AbstractSqsConsumer.ts b/packages/sqs/lib/sqs/AbstractSqsConsumer.ts index 19327992..34703f5c 100644 --- a/packages/sqs/lib/sqs/AbstractSqsConsumer.ts +++ b/packages/sqs/lib/sqs/AbstractSqsConsumer.ts @@ -22,6 +22,7 @@ import type { ConsumerOptions } from 'sqs-consumer/src/types' import type { SQSMessage } from '../types/MessageTypes' import { deleteSqs, initSqs } from '../utils/sqsInitter' import { readSqsMessage } from '../utils/sqsMessageReader' +import { getQueueAttributes } from '../utils/sqsUtils' import type { SQSCreationConfig, SQSDependencies, SQSQueueLocatorType } from './AbstractSqsService' import { AbstractSqsService } from './AbstractSqsService' @@ -54,8 +55,16 @@ export type SQSConsumerOptions< SQSCreationConfig, SQSQueueLocatorType > & { - consumerOverrides?: Partial + /** + * Omitting properties which will be set internally ins this class + * `visibilityTimeout` is also omitted to avoid conflicts with queue config + */ + consumerOverrides?: Omit< + ConsumerOptions, + 'sqs' | 'queueUrl' | 'handler' | 'handleMessageBatch' | 'visibilityTimeout' + > } + export abstract class AbstractSqsConsumer< MessagePayloadType extends object, ExecutionContext, @@ -170,12 +179,15 @@ export abstract class AbstractSqsConsumer< public async start() { await this.init() + if (this.consumer) this.consumer.stop() + + const visibilityTimeout = await this.getQueueVisibilityTimeout() - if (this.consumer) { - this.consumer.stop() - } this.consumer = Consumer.create({ + sqs: this.sqsClient, queueUrl: this.queueUrl, + visibilityTimeout, + ...this.consumerOptionsOverride, handleMessage: async (message: SQSMessage) => { if (message === null) return @@ -235,8 +247,6 @@ export abstract class AbstractSqsConsumer< return Promise.reject(result.error) }, - sqs: this.sqsClient, - ...this.consumerOptionsOverride, }) this.consumer.on('error', (err) => { @@ -411,4 +421,22 @@ export abstract class AbstractSqsConsumer< }) await this.sqsClient.send(command) } + + private async getQueueVisibilityTimeout(): Promise { + let visibilityTimeoutString + if (this.creationConfig) { + visibilityTimeoutString = this.creationConfig.queue.Attributes?.VisibilityTimeout + } else { + // if user is using locatorConfig, we should look into queue config + const queueAttributes = await getQueueAttributes( + this.sqsClient, + { queueUrl: this.queueUrl }, + ['VisibilityTimeout'], + ) + visibilityTimeoutString = queueAttributes.result?.attributes?.VisibilityTimeout + } + + // parseInt is safe because if the value is not a number process should have failed on init + return visibilityTimeoutString ? parseInt(visibilityTimeoutString) : undefined + } } diff --git a/packages/sqs/package.json b/packages/sqs/package.json index 1fed507e..b1cbf7fa 100644 --- a/packages/sqs/package.json +++ b/packages/sqs/package.json @@ -1,6 +1,6 @@ { "name": "@message-queue-toolkit/sqs", - "version": "12.1.1", + "version": "13.0.0", "private": false, "license": "MIT", "description": "SQS adapter for message-queue-toolkit", @@ -25,7 +25,7 @@ "prepublishOnly": "npm run build:release" }, "dependencies": { - "@lokalise/node-core": "^9.10.1", + "@lokalise/node-core": "^9.14.0", "sqs-consumer": "^9.1.0", "zod": "^3.22.4" }, diff --git a/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts b/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts index 1b1dc87e..9147dc03 100644 --- a/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts +++ b/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts @@ -1,16 +1,18 @@ +import { setTimeout } from 'node:timers/promises' + import type { SendMessageCommandInput, SQSClient } from '@aws-sdk/client-sqs' import { SendMessageCommand, ReceiveMessageCommand } from '@aws-sdk/client-sqs' import { waitAndRetry } from '@lokalise/node-core' import type { BarrierResult } from '@message-queue-toolkit/core' import type { AwilixContainer } from 'awilix' -import { asClass, asFunction } from 'awilix' +import { asValue, asClass, asFunction } from 'awilix' import { describe, beforeEach, afterEach, expect, it } from 'vitest' import { ZodError } from 'zod' import { FakeConsumerErrorResolver } from '../../lib/fakes/FakeConsumerErrorResolver' import { assertQueue, deleteQueue, getQueueAttributes } from '../../lib/utils/sqsUtils' import { FakeLogger } from '../fakes/FakeLogger' -import type { SqsPermissionPublisher } from '../publishers/SqsPermissionPublisher' +import { SqsPermissionPublisher } from '../publishers/SqsPermissionPublisher' import { registerDependencies, SINGLETON_CONFIG } from '../utils/testContext' import type { Dependencies } from '../utils/testContext' @@ -75,6 +77,7 @@ describe('SqsPermissionConsumer', () => { QueueName: queueName, Attributes: { KmsMasterKeyId: 'othervalue', + VisibilityTimeout: '10', }, }, updateAttributesIfExists: true, @@ -101,7 +104,10 @@ describe('SqsPermissionConsumer', () => { queueUrl: newConsumer.queueProps.url, }) - expect(attributes.result?.attributes!.KmsMasterKeyId).toBe('othervalue') + expect(attributes.result?.attributes).toMatchObject({ + KmsMasterKeyId: 'othervalue', + VisibilityTimeout: '10', + }) }) it('does not update existing queue when attributes did not change', async () => { @@ -478,4 +484,72 @@ describe('SqsPermissionConsumer', () => { expect(consumer.removeCounter).toBe(2) }) }) + + describe('visibility timeout', () => { + const queueName = 'myTestQueue' + let diContainer: AwilixContainer + + beforeEach(async () => { + diContainer = await registerDependencies({ + permissionPublisher: asValue(() => undefined), + permissionConsumer: asValue(() => undefined), + }) + }) + + afterEach(async () => { + await diContainer.cradle.awilixManager.executeDispose() + await diContainer.dispose() + }) + + it('heartbeatInterval should be less than visibilityTimeout', async () => { + const consumer = new SqsPermissionConsumer(diContainer.cradle, { + creationConfig: { queue: { QueueName: queueName, Attributes: { VisibilityTimeout: '1' } } }, + consumerOverrides: { heartbeatInterval: 2 }, + }) + await expect(() => consumer.start()).rejects.toThrow( + /heartbeatInterval must be less than visibilityTimeout/, + ) + }) + + it.each([false, true])('using 2 consumers with heartbeat -> %s', async (heartbeatEnabled) => { + let consumer1IsProcessing = false + let consumer1Counter = 0 + let consumer2Counter = 0 + + const consumer1 = new SqsPermissionConsumer(diContainer.cradle, { + creationConfig: { queue: { QueueName: queueName, Attributes: { VisibilityTimeout: '2' } } }, + consumerOverrides: { heartbeatInterval: heartbeatEnabled ? 1 : undefined }, + removeHandlerOverride: async () => { + consumer1IsProcessing = true + await setTimeout(2800) // Wait to the visibility timeout to expire + consumer1Counter++ + consumer1IsProcessing = false + return { result: 'success' } + }, + }) + await consumer1.start() + + const consumer2 = new SqsPermissionConsumer(diContainer.cradle, { + locatorConfig: { queueUrl: consumer1.queueProps.url }, + removeHandlerOverride: async () => { + consumer2Counter++ + return { result: 'success' } + }, + }) + const publisher = new SqsPermissionPublisher(diContainer.cradle, { + locatorConfig: { queueUrl: consumer1.queueProps.url }, + }) + + await publisher.publish({ id: '10', messageType: 'remove' }) + // wait for consumer1 to start processing to start second consumer + await waitAndRetry(() => consumer1IsProcessing, 5, 5) + await consumer2.start() + + // wait for both consumers to process message + await waitAndRetry(() => consumer1Counter > 0 && consumer2Counter > 0, 100, 40) + + expect(consumer1Counter).toBe(1) + expect(consumer2Counter).toBe(heartbeatEnabled ? 0 : 1) + }) + }) }) diff --git a/packages/sqs/test/consumers/SqsPermissionConsumer.ts b/packages/sqs/test/consumers/SqsPermissionConsumer.ts index c2d66f3a..47bdcc66 100644 --- a/packages/sqs/test/consumers/SqsPermissionConsumer.ts +++ b/packages/sqs/test/consumers/SqsPermissionConsumer.ts @@ -18,7 +18,12 @@ type SupportedMessages = PERMISSIONS_ADD_MESSAGE_TYPE | PERMISSIONS_REMOVE_MESSA type SqsPermissionConsumerOptions = Pick< SQSConsumerOptions, - 'creationConfig' | 'locatorConfig' | 'logMessages' | 'deletionConfig' | 'deadLetterQueue' + | 'creationConfig' + | 'locatorConfig' + | 'logMessages' + | 'deletionConfig' + | 'deadLetterQueue' + | 'consumerOverrides' > & { addPreHandlerBarrier?: ( message: SupportedMessages, @@ -87,7 +92,7 @@ export class SqsPermissionConsumer extends AbstractSqsConsumer< deadLetterQueue: options.deadLetterQueue, messageTypeField: 'messageType', handlerSpy: true, - consumerOverrides: { + consumerOverrides: options.consumerOverrides ?? { terminateVisibilityTimeout: true, // this allows to retry failed messages immediately }, handlers: new MessageHandlerConfigBuilder<