Skip to content

Commit

Permalink
Visibility timeout heartbeat (#124)
Browse files Browse the repository at this point in the history
* TDD SQS visibility timeout

* Improving test to be reliable

* Fixing exposed parameters on consumerOverrides

* Setting consumer visibilityTimeout

* Preparing test

* node-core update

* Minor fix

* Adding test for heartbeatinterval

* Adjusting type

* Removing comment

* SQS major + test

* SNS supporting heartbeatInterval + tests

* SNS major

* Minor improvement

* Using asValue on tests

* Improving visibilityTimeout readability

* Using Omit on consumerOverrides

* Introducing getQueueVisibilityTimeout

* Improving tests

* Using setTimeout
  • Loading branch information
CarlosGamero authored Apr 23, 2024
1 parent d02db20 commit fd40520
Show file tree
Hide file tree
Showing 7 changed files with 203 additions and 22 deletions.
6 changes: 3 additions & 3 deletions packages/sns/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@message-queue-toolkit/sns",
"version": "12.1.1",
"version": "13.0.0",
"private": false,
"license": "MIT",
"description": "SNS adapter for message-queue-toolkit",
Expand All @@ -25,15 +25,15 @@
"prepublishOnly": "npm run build:release"
},
"dependencies": {
"@lokalise/node-core": "^9.10.1",
"@lokalise/node-core": "^9.14.0",
"sqs-consumer": "^9.1.0",
"zod": "^3.22.4"
},
"peerDependencies": {
"@aws-sdk/client-sns": "^3.476.0",
"@aws-sdk/client-sqs": "^3.476.0",
"@message-queue-toolkit/core": "^10.1.1",
"@message-queue-toolkit/sqs": "^12.1.1"
"@message-queue-toolkit/sqs": "^13.0.0"
},
"devDependencies": {
"@aws-sdk/client-sns": "^3.529.1",
Expand Down
78 changes: 76 additions & 2 deletions packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import { setTimeout } from 'node:timers/promises'

import type { SNSClient } from '@aws-sdk/client-sns'
import type { SQSClient } from '@aws-sdk/client-sqs'
import { waitAndRetry } from '@lokalise/node-core'
import { assertQueue, deleteQueue, getQueueAttributes } from '@message-queue-toolkit/sqs'
import type { AwilixContainer } from 'awilix'
import { asValue } from 'awilix'
import { describe, beforeEach, afterEach, expect, it, beforeAll } from 'vitest'

import { assertTopic, deleteTopic } from '../../lib/utils/snsUtils'
import type { SnsPermissionPublisher } from '../publishers/SnsPermissionPublisher'
import { SnsPermissionPublisher } from '../publishers/SnsPermissionPublisher'
import { registerDependencies } from '../utils/testContext'
import type { Dependencies } from '../utils/testContext'

Expand Down Expand Up @@ -88,6 +92,7 @@ describe('SnsSqsPermissionConsumer', () => {
QueueName: 'existingQueue',
Attributes: {
KmsMasterKeyId: 'othervalue',
VisibilityTimeout: '10',
},
},
updateAttributesIfExists: true,
Expand All @@ -106,7 +111,10 @@ describe('SnsSqsPermissionConsumer', () => {
queueUrl: newConsumer.subscriptionProps.queueUrl,
})

expect(attributes.result?.attributes!.KmsMasterKeyId).toBe('othervalue')
expect(attributes.result?.attributes).toMatchObject({
KmsMasterKeyId: 'othervalue',
VisibilityTimeout: '10',
})
})

it('updates existing queue when one with different attributes exist and sets the policy', async () => {
Expand Down Expand Up @@ -435,4 +443,70 @@ describe('SnsSqsPermissionConsumer', () => {
})
})
})

describe('visibility timeout', () => {
const topicName = 'myTestTopic'
const queueName = 'myTestQueue'
let diContainer: AwilixContainer<Dependencies>

beforeEach(async () => {
diContainer = await registerDependencies({
permissionConsumer: asValue(() => undefined),
permissionPublisher: asValue(() => undefined),
})
})

afterEach(async () => {
await diContainer.cradle.awilixManager.executeDispose()
await diContainer.dispose()
})

it.each([false, true])('using 2 consumers with heartbeat -> %s', async (heartbeatEnabled) => {
let consumer1IsProcessing = false
let consumer1Counter = 0
let consumer2Counter = 0

const consumer1 = new SnsSqsPermissionConsumer(diContainer.cradle, {
creationConfig: {
topic: { Name: topicName },
queue: { QueueName: queueName, Attributes: { VisibilityTimeout: '2' } },
},
consumerOverrides: { heartbeatInterval: heartbeatEnabled ? 1 : undefined },
removeHandlerOverride: async () => {
consumer1IsProcessing = true
await setTimeout(2800) // Wait to the visibility timeout to expire
consumer1Counter++
consumer1IsProcessing = false
return { result: 'success' }
},
})
await consumer1.start()

const consumer2 = new SnsSqsPermissionConsumer(diContainer.cradle, {
locatorConfig: {
queueUrl: consumer1.subscriptionProps.queueUrl,
topicArn: consumer1.subscriptionProps.topicArn,
subscriptionArn: consumer1.subscriptionProps.subscriptionArn,
},
removeHandlerOverride: async () => {
consumer2Counter++
return { result: 'success' }
},
})
const publisher = new SnsPermissionPublisher(diContainer.cradle, {
locatorConfig: { topicArn: consumer1.subscriptionProps.topicArn },
})

await publisher.publish({ id: '10', messageType: 'remove' })
// wait for consumer1 to start processing to start second consumer
await waitAndRetry(() => consumer1IsProcessing, 5, 5)
await consumer2.start()

// wait for both consumers to process message
await waitAndRetry(() => consumer1Counter > 0 && consumer2Counter > 0, 100, 40)

expect(consumer1Counter).toBe(1)
expect(consumer2Counter).toBe(heartbeatEnabled ? 0 : 1)
})
})
})
8 changes: 4 additions & 4 deletions packages/sns/test/consumers/SnsSqsPermissionConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type PrehandlerOutput = {

type SnsSqsPermissionConsumerOptions = Pick<
SNSSQSConsumerOptions<SupportedMessages, ExecutionContext, PrehandlerOutput>,
'creationConfig' | 'locatorConfig' | 'deletionConfig' | 'deadLetterQueue'
'creationConfig' | 'locatorConfig' | 'deletionConfig' | 'deadLetterQueue' | 'consumerOverrides'
> & {
addPreHandlerBarrier?: (
message: SupportedMessages,
Expand Down Expand Up @@ -143,6 +143,9 @@ export class SnsSqsPermissionConsumer extends AbstractSnsSqsConsumer<
deletionConfig: options.deletionConfig ?? {
deleteIfExists: true,
},
consumerOverrides: options.consumerOverrides ?? {
terminateVisibilityTimeout: true, // this allows to retry failed messages immediately
},
deadLetterQueue: options.deadLetterQueue,
...(options.locatorConfig
? { locatorConfig: options.locatorConfig }
Expand All @@ -153,9 +156,6 @@ export class SnsSqsPermissionConsumer extends AbstractSnsSqsConsumer<
},
}),
messageTypeField: 'messageType',
consumerOverrides: {
terminateVisibilityTimeout: true, // this allows to retry failed messages immediately
},
subscriptionConfig: {
updateAttributesIfExists: false,
},
Expand Down
40 changes: 34 additions & 6 deletions packages/sqs/lib/sqs/AbstractSqsConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import type { ConsumerOptions } from 'sqs-consumer/src/types'
import type { SQSMessage } from '../types/MessageTypes'
import { deleteSqs, initSqs } from '../utils/sqsInitter'
import { readSqsMessage } from '../utils/sqsMessageReader'
import { getQueueAttributes } from '../utils/sqsUtils'

import type { SQSCreationConfig, SQSDependencies, SQSQueueLocatorType } from './AbstractSqsService'
import { AbstractSqsService } from './AbstractSqsService'
Expand Down Expand Up @@ -54,8 +55,16 @@ export type SQSConsumerOptions<
SQSCreationConfig,
SQSQueueLocatorType
> & {
consumerOverrides?: Partial<ConsumerOptions>
/**
* Omitting properties which will be set internally ins this class
* `visibilityTimeout` is also omitted to avoid conflicts with queue config
*/
consumerOverrides?: Omit<
ConsumerOptions,
'sqs' | 'queueUrl' | 'handler' | 'handleMessageBatch' | 'visibilityTimeout'
>
}

export abstract class AbstractSqsConsumer<
MessagePayloadType extends object,
ExecutionContext,
Expand Down Expand Up @@ -170,12 +179,15 @@ export abstract class AbstractSqsConsumer<

public async start() {
await this.init()
if (this.consumer) this.consumer.stop()

const visibilityTimeout = await this.getQueueVisibilityTimeout()

if (this.consumer) {
this.consumer.stop()
}
this.consumer = Consumer.create({
sqs: this.sqsClient,
queueUrl: this.queueUrl,
visibilityTimeout,
...this.consumerOptionsOverride,
handleMessage: async (message: SQSMessage) => {
if (message === null) return

Expand Down Expand Up @@ -235,8 +247,6 @@ export abstract class AbstractSqsConsumer<

return Promise.reject(result.error)
},
sqs: this.sqsClient,
...this.consumerOptionsOverride,
})

this.consumer.on('error', (err) => {
Expand Down Expand Up @@ -411,4 +421,22 @@ export abstract class AbstractSqsConsumer<
})
await this.sqsClient.send(command)
}

private async getQueueVisibilityTimeout(): Promise<number | undefined> {
let visibilityTimeoutString
if (this.creationConfig) {
visibilityTimeoutString = this.creationConfig.queue.Attributes?.VisibilityTimeout
} else {
// if user is using locatorConfig, we should look into queue config
const queueAttributes = await getQueueAttributes(
this.sqsClient,
{ queueUrl: this.queueUrl },
['VisibilityTimeout'],
)
visibilityTimeoutString = queueAttributes.result?.attributes?.VisibilityTimeout
}

// parseInt is safe because if the value is not a number process should have failed on init
return visibilityTimeoutString ? parseInt(visibilityTimeoutString) : undefined
}
}
4 changes: 2 additions & 2 deletions packages/sqs/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@message-queue-toolkit/sqs",
"version": "12.1.1",
"version": "13.0.0",
"private": false,
"license": "MIT",
"description": "SQS adapter for message-queue-toolkit",
Expand All @@ -25,7 +25,7 @@
"prepublishOnly": "npm run build:release"
},
"dependencies": {
"@lokalise/node-core": "^9.10.1",
"@lokalise/node-core": "^9.14.0",
"sqs-consumer": "^9.1.0",
"zod": "^3.22.4"
},
Expand Down
80 changes: 77 additions & 3 deletions packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import { setTimeout } from 'node:timers/promises'

import type { SendMessageCommandInput, SQSClient } from '@aws-sdk/client-sqs'
import { SendMessageCommand, ReceiveMessageCommand } from '@aws-sdk/client-sqs'
import { waitAndRetry } from '@lokalise/node-core'
import type { BarrierResult } from '@message-queue-toolkit/core'
import type { AwilixContainer } from 'awilix'
import { asClass, asFunction } from 'awilix'
import { asValue, asClass, asFunction } from 'awilix'
import { describe, beforeEach, afterEach, expect, it } from 'vitest'
import { ZodError } from 'zod'

import { FakeConsumerErrorResolver } from '../../lib/fakes/FakeConsumerErrorResolver'
import { assertQueue, deleteQueue, getQueueAttributes } from '../../lib/utils/sqsUtils'
import { FakeLogger } from '../fakes/FakeLogger'
import type { SqsPermissionPublisher } from '../publishers/SqsPermissionPublisher'
import { SqsPermissionPublisher } from '../publishers/SqsPermissionPublisher'
import { registerDependencies, SINGLETON_CONFIG } from '../utils/testContext'
import type { Dependencies } from '../utils/testContext'

Expand Down Expand Up @@ -75,6 +77,7 @@ describe('SqsPermissionConsumer', () => {
QueueName: queueName,
Attributes: {
KmsMasterKeyId: 'othervalue',
VisibilityTimeout: '10',
},
},
updateAttributesIfExists: true,
Expand All @@ -101,7 +104,10 @@ describe('SqsPermissionConsumer', () => {
queueUrl: newConsumer.queueProps.url,
})

expect(attributes.result?.attributes!.KmsMasterKeyId).toBe('othervalue')
expect(attributes.result?.attributes).toMatchObject({
KmsMasterKeyId: 'othervalue',
VisibilityTimeout: '10',
})
})

it('does not update existing queue when attributes did not change', async () => {
Expand Down Expand Up @@ -478,4 +484,72 @@ describe('SqsPermissionConsumer', () => {
expect(consumer.removeCounter).toBe(2)
})
})

describe('visibility timeout', () => {
const queueName = 'myTestQueue'
let diContainer: AwilixContainer<Dependencies>

beforeEach(async () => {
diContainer = await registerDependencies({
permissionPublisher: asValue(() => undefined),
permissionConsumer: asValue(() => undefined),
})
})

afterEach(async () => {
await diContainer.cradle.awilixManager.executeDispose()
await diContainer.dispose()
})

it('heartbeatInterval should be less than visibilityTimeout', async () => {
const consumer = new SqsPermissionConsumer(diContainer.cradle, {
creationConfig: { queue: { QueueName: queueName, Attributes: { VisibilityTimeout: '1' } } },
consumerOverrides: { heartbeatInterval: 2 },
})
await expect(() => consumer.start()).rejects.toThrow(
/heartbeatInterval must be less than visibilityTimeout/,
)
})

it.each([false, true])('using 2 consumers with heartbeat -> %s', async (heartbeatEnabled) => {
let consumer1IsProcessing = false
let consumer1Counter = 0
let consumer2Counter = 0

const consumer1 = new SqsPermissionConsumer(diContainer.cradle, {
creationConfig: { queue: { QueueName: queueName, Attributes: { VisibilityTimeout: '2' } } },
consumerOverrides: { heartbeatInterval: heartbeatEnabled ? 1 : undefined },
removeHandlerOverride: async () => {
consumer1IsProcessing = true
await setTimeout(2800) // Wait to the visibility timeout to expire
consumer1Counter++
consumer1IsProcessing = false
return { result: 'success' }
},
})
await consumer1.start()

const consumer2 = new SqsPermissionConsumer(diContainer.cradle, {
locatorConfig: { queueUrl: consumer1.queueProps.url },
removeHandlerOverride: async () => {
consumer2Counter++
return { result: 'success' }
},
})
const publisher = new SqsPermissionPublisher(diContainer.cradle, {
locatorConfig: { queueUrl: consumer1.queueProps.url },
})

await publisher.publish({ id: '10', messageType: 'remove' })
// wait for consumer1 to start processing to start second consumer
await waitAndRetry(() => consumer1IsProcessing, 5, 5)
await consumer2.start()

// wait for both consumers to process message
await waitAndRetry(() => consumer1Counter > 0 && consumer2Counter > 0, 100, 40)

expect(consumer1Counter).toBe(1)
expect(consumer2Counter).toBe(heartbeatEnabled ? 0 : 1)
})
})
})
9 changes: 7 additions & 2 deletions packages/sqs/test/consumers/SqsPermissionConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@ type SupportedMessages = PERMISSIONS_ADD_MESSAGE_TYPE | PERMISSIONS_REMOVE_MESSA

type SqsPermissionConsumerOptions = Pick<
SQSConsumerOptions<SupportedMessages, ExecutionContext, PrehandlerOutput>,
'creationConfig' | 'locatorConfig' | 'logMessages' | 'deletionConfig' | 'deadLetterQueue'
| 'creationConfig'
| 'locatorConfig'
| 'logMessages'
| 'deletionConfig'
| 'deadLetterQueue'
| 'consumerOverrides'
> & {
addPreHandlerBarrier?: (
message: SupportedMessages,
Expand Down Expand Up @@ -87,7 +92,7 @@ export class SqsPermissionConsumer extends AbstractSqsConsumer<
deadLetterQueue: options.deadLetterQueue,
messageTypeField: 'messageType',
handlerSpy: true,
consumerOverrides: {
consumerOverrides: options.consumerOverrides ?? {
terminateVisibilityTimeout: true, // this allows to retry failed messages immediately
},
handlers: new MessageHandlerConfigBuilder<
Expand Down

0 comments on commit fd40520

Please sign in to comment.