Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Visibility timeout heartbeat #124

Merged
merged 20 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions packages/sns/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@message-queue-toolkit/sns",
"version": "12.1.1",
"version": "13.0.0",
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

before we release this version, let's also merge #121, so that we don't need to do semver majors in a row

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds great to me 🙇 I will wait to merge this PR

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

feel free to merge, just don't hurry releasing it :D.
After you are done, I will address any conflicts if there are on the other PR, and release both changes

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, right, this repo doesn't have auto-release 😅 sorry, I was confused, thanks for the clarification 🙏

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merged, please let me know If can help with conflicts or release 🙇

"private": false,
"license": "MIT",
"description": "SNS adapter for message-queue-toolkit",
Expand All @@ -25,15 +25,15 @@
"prepublishOnly": "npm run build:release"
},
"dependencies": {
"@lokalise/node-core": "^9.10.1",
"@lokalise/node-core": "^9.14.0",
"sqs-consumer": "^9.1.0",
"zod": "^3.22.4"
},
"peerDependencies": {
"@aws-sdk/client-sns": "^3.476.0",
"@aws-sdk/client-sqs": "^3.476.0",
"@message-queue-toolkit/core": "^10.1.1",
"@message-queue-toolkit/sqs": "^12.1.1"
"@message-queue-toolkit/sqs": "^13.0.0"
},
"devDependencies": {
"@aws-sdk/client-sns": "^3.529.1",
Expand Down
78 changes: 76 additions & 2 deletions packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import { setTimeout } from 'node:timers/promises'

import type { SNSClient } from '@aws-sdk/client-sns'
import type { SQSClient } from '@aws-sdk/client-sqs'
import { waitAndRetry } from '@lokalise/node-core'
import { assertQueue, deleteQueue, getQueueAttributes } from '@message-queue-toolkit/sqs'
import type { AwilixContainer } from 'awilix'
import { asValue } from 'awilix'
import { describe, beforeEach, afterEach, expect, it, beforeAll } from 'vitest'

import { assertTopic, deleteTopic } from '../../lib/utils/snsUtils'
import type { SnsPermissionPublisher } from '../publishers/SnsPermissionPublisher'
import { SnsPermissionPublisher } from '../publishers/SnsPermissionPublisher'
import { registerDependencies } from '../utils/testContext'
import type { Dependencies } from '../utils/testContext'

Expand Down Expand Up @@ -88,6 +92,7 @@ describe('SnsSqsPermissionConsumer', () => {
QueueName: 'existingQueue',
Attributes: {
KmsMasterKeyId: 'othervalue',
VisibilityTimeout: '10',
},
},
updateAttributesIfExists: true,
Expand All @@ -106,7 +111,10 @@ describe('SnsSqsPermissionConsumer', () => {
queueUrl: newConsumer.subscriptionProps.queueUrl,
})

expect(attributes.result?.attributes!.KmsMasterKeyId).toBe('othervalue')
expect(attributes.result?.attributes).toMatchObject({
KmsMasterKeyId: 'othervalue',
VisibilityTimeout: '10',
})
})

it('updates existing queue when one with different attributes exist and sets the policy', async () => {
Expand Down Expand Up @@ -435,4 +443,70 @@ describe('SnsSqsPermissionConsumer', () => {
})
})
})

describe('visibility timeout', () => {
const topicName = 'myTestTopic'
const queueName = 'myTestQueue'
let diContainer: AwilixContainer<Dependencies>

beforeEach(async () => {
diContainer = await registerDependencies({
permissionConsumer: asValue(() => undefined),
permissionPublisher: asValue(() => undefined),
})
})

afterEach(async () => {
await diContainer.cradle.awilixManager.executeDispose()
await diContainer.dispose()
})

it.each([false, true])('using 2 consumers with heartbeat -> %s', async (heartbeatEnabled) => {
let consumer1IsProcessing = false
let consumer1Counter = 0
let consumer2Counter = 0

const consumer1 = new SnsSqsPermissionConsumer(diContainer.cradle, {
creationConfig: {
topic: { Name: topicName },
queue: { QueueName: queueName, Attributes: { VisibilityTimeout: '2' } },
},
consumerOverrides: { heartbeatInterval: heartbeatEnabled ? 1 : undefined },
removeHandlerOverride: async () => {
consumer1IsProcessing = true
await setTimeout(2800) // Wait to the visibility timeout to expire
consumer1Counter++
consumer1IsProcessing = false
return { result: 'success' }
},
})
await consumer1.start()

const consumer2 = new SnsSqsPermissionConsumer(diContainer.cradle, {
locatorConfig: {
queueUrl: consumer1.subscriptionProps.queueUrl,
topicArn: consumer1.subscriptionProps.topicArn,
subscriptionArn: consumer1.subscriptionProps.subscriptionArn,
},
removeHandlerOverride: async () => {
consumer2Counter++
return { result: 'success' }
},
})
const publisher = new SnsPermissionPublisher(diContainer.cradle, {
locatorConfig: { topicArn: consumer1.subscriptionProps.topicArn },
})

await publisher.publish({ id: '10', messageType: 'remove' })
// wait for consumer1 to start processing to start second consumer
await waitAndRetry(() => consumer1IsProcessing, 5, 5)
await consumer2.start()

// wait for both consumers to process message
await waitAndRetry(() => consumer1Counter > 0 && consumer2Counter > 0, 100, 40)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should assert that it resolves to true and not just timeouts

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check is done using counters, in the second each case I don't only want to know it is false, we should check that consumer1 was de only who received the message


expect(consumer1Counter).toBe(1)
expect(consumer2Counter).toBe(heartbeatEnabled ? 0 : 1)
})
})
})
8 changes: 4 additions & 4 deletions packages/sns/test/consumers/SnsSqsPermissionConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type PrehandlerOutput = {

type SnsSqsPermissionConsumerOptions = Pick<
SNSSQSConsumerOptions<SupportedMessages, ExecutionContext, PrehandlerOutput>,
'creationConfig' | 'locatorConfig' | 'deletionConfig' | 'deadLetterQueue'
'creationConfig' | 'locatorConfig' | 'deletionConfig' | 'deadLetterQueue' | 'consumerOverrides'
> & {
addPreHandlerBarrier?: (
message: SupportedMessages,
Expand Down Expand Up @@ -143,6 +143,9 @@ export class SnsSqsPermissionConsumer extends AbstractSnsSqsConsumer<
deletionConfig: options.deletionConfig ?? {
deleteIfExists: true,
},
consumerOverrides: options.consumerOverrides ?? {
terminateVisibilityTimeout: true, // this allows to retry failed messages immediately
},
deadLetterQueue: options.deadLetterQueue,
...(options.locatorConfig
? { locatorConfig: options.locatorConfig }
Expand All @@ -153,9 +156,6 @@ export class SnsSqsPermissionConsumer extends AbstractSnsSqsConsumer<
},
}),
messageTypeField: 'messageType',
consumerOverrides: {
terminateVisibilityTimeout: true, // this allows to retry failed messages immediately
},
subscriptionConfig: {
updateAttributesIfExists: false,
},
Expand Down
40 changes: 34 additions & 6 deletions packages/sqs/lib/sqs/AbstractSqsConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import type { ConsumerOptions } from 'sqs-consumer/src/types'
import type { SQSMessage } from '../types/MessageTypes'
import { deleteSqs, initSqs } from '../utils/sqsInitter'
import { readSqsMessage } from '../utils/sqsMessageReader'
import { getQueueAttributes } from '../utils/sqsUtils'

import type { SQSCreationConfig, SQSDependencies, SQSQueueLocatorType } from './AbstractSqsService'
import { AbstractSqsService } from './AbstractSqsService'
Expand Down Expand Up @@ -54,8 +55,16 @@ export type SQSConsumerOptions<
SQSCreationConfig,
SQSQueueLocatorType
> & {
consumerOverrides?: Partial<ConsumerOptions>
/**
* Omitting properties which will be set internally ins this class
* `visibilityTimeout` is also omitted to avoid conflicts with queue config
*/
consumerOverrides?: Omit<
ConsumerOptions,
'sqs' | 'queueUrl' | 'handler' | 'handleMessageBatch' | 'visibilityTimeout'
>
}

export abstract class AbstractSqsConsumer<
MessagePayloadType extends object,
ExecutionContext,
Expand Down Expand Up @@ -170,12 +179,15 @@ export abstract class AbstractSqsConsumer<

public async start() {
await this.init()
if (this.consumer) this.consumer.stop()

const visibilityTimeout = await this.getQueueVisibilityTimeout()

if (this.consumer) {
this.consumer.stop()
}
this.consumer = Consumer.create({
sqs: this.sqsClient,
queueUrl: this.queueUrl,
visibilityTimeout,
...this.consumerOptionsOverride,
handleMessage: async (message: SQSMessage) => {
if (message === null) return

Expand Down Expand Up @@ -235,8 +247,6 @@ export abstract class AbstractSqsConsumer<

return Promise.reject(result.error)
},
sqs: this.sqsClient,
...this.consumerOptionsOverride,
})

this.consumer.on('error', (err) => {
Expand Down Expand Up @@ -411,4 +421,22 @@ export abstract class AbstractSqsConsumer<
})
await this.sqsClient.send(command)
}

private async getQueueVisibilityTimeout(): Promise<number | undefined> {
let visibilityTimeoutString
if (this.creationConfig) {
visibilityTimeoutString = this.creationConfig.queue.Attributes?.VisibilityTimeout
} else {
// if user is using locatorConfig, we should look into queue config
const queueAttributes = await getQueueAttributes(
this.sqsClient,
{ queueUrl: this.queueUrl },
['VisibilityTimeout'],
)
visibilityTimeoutString = queueAttributes.result?.attributes?.VisibilityTimeout
}

// parseInt is safe because if the value is not a number process should have failed on init
return visibilityTimeoutString ? parseInt(visibilityTimeoutString) : undefined
}
}
4 changes: 2 additions & 2 deletions packages/sqs/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@message-queue-toolkit/sqs",
"version": "12.1.1",
"version": "13.0.0",
"private": false,
"license": "MIT",
"description": "SQS adapter for message-queue-toolkit",
Expand All @@ -25,7 +25,7 @@
"prepublishOnly": "npm run build:release"
},
"dependencies": {
"@lokalise/node-core": "^9.10.1",
"@lokalise/node-core": "^9.14.0",
"sqs-consumer": "^9.1.0",
"zod": "^3.22.4"
},
Expand Down
80 changes: 77 additions & 3 deletions packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import { setTimeout } from 'node:timers/promises'

import type { SendMessageCommandInput, SQSClient } from '@aws-sdk/client-sqs'
import { SendMessageCommand, ReceiveMessageCommand } from '@aws-sdk/client-sqs'
import { waitAndRetry } from '@lokalise/node-core'
import type { BarrierResult } from '@message-queue-toolkit/core'
import type { AwilixContainer } from 'awilix'
import { asClass, asFunction } from 'awilix'
import { asValue, asClass, asFunction } from 'awilix'
import { describe, beforeEach, afterEach, expect, it } from 'vitest'
import { ZodError } from 'zod'

import { FakeConsumerErrorResolver } from '../../lib/fakes/FakeConsumerErrorResolver'
import { assertQueue, deleteQueue, getQueueAttributes } from '../../lib/utils/sqsUtils'
import { FakeLogger } from '../fakes/FakeLogger'
import type { SqsPermissionPublisher } from '../publishers/SqsPermissionPublisher'
import { SqsPermissionPublisher } from '../publishers/SqsPermissionPublisher'
import { registerDependencies, SINGLETON_CONFIG } from '../utils/testContext'
import type { Dependencies } from '../utils/testContext'

Expand Down Expand Up @@ -75,6 +77,7 @@ describe('SqsPermissionConsumer', () => {
QueueName: queueName,
Attributes: {
KmsMasterKeyId: 'othervalue',
VisibilityTimeout: '10',
},
},
updateAttributesIfExists: true,
Expand All @@ -101,7 +104,10 @@ describe('SqsPermissionConsumer', () => {
queueUrl: newConsumer.queueProps.url,
})

expect(attributes.result?.attributes!.KmsMasterKeyId).toBe('othervalue')
expect(attributes.result?.attributes).toMatchObject({
KmsMasterKeyId: 'othervalue',
VisibilityTimeout: '10',
})
})

it('does not update existing queue when attributes did not change', async () => {
Expand Down Expand Up @@ -478,4 +484,72 @@ describe('SqsPermissionConsumer', () => {
expect(consumer.removeCounter).toBe(2)
})
})

describe('visibility timeout', () => {
const queueName = 'myTestQueue'
let diContainer: AwilixContainer<Dependencies>

beforeEach(async () => {
diContainer = await registerDependencies({
permissionPublisher: asValue(() => undefined),
permissionConsumer: asValue(() => undefined),
})
})

afterEach(async () => {
await diContainer.cradle.awilixManager.executeDispose()
await diContainer.dispose()
})

it('heartbeatInterval should be less than visibilityTimeout', async () => {
const consumer = new SqsPermissionConsumer(diContainer.cradle, {
creationConfig: { queue: { QueueName: queueName, Attributes: { VisibilityTimeout: '1' } } },
consumerOverrides: { heartbeatInterval: 2 },
})
await expect(() => consumer.start()).rejects.toThrow(
/heartbeatInterval must be less than visibilityTimeout/,
)
})

it.each([false, true])('using 2 consumers with heartbeat -> %s', async (heartbeatEnabled) => {
let consumer1IsProcessing = false
let consumer1Counter = 0
let consumer2Counter = 0

const consumer1 = new SqsPermissionConsumer(diContainer.cradle, {
creationConfig: { queue: { QueueName: queueName, Attributes: { VisibilityTimeout: '2' } } },
consumerOverrides: { heartbeatInterval: heartbeatEnabled ? 1 : undefined },
removeHandlerOverride: async () => {
consumer1IsProcessing = true
await setTimeout(2800) // Wait to the visibility timeout to expire
consumer1Counter++
consumer1IsProcessing = false
return { result: 'success' }
},
})
await consumer1.start()

const consumer2 = new SqsPermissionConsumer(diContainer.cradle, {
locatorConfig: { queueUrl: consumer1.queueProps.url },
removeHandlerOverride: async () => {
consumer2Counter++
return { result: 'success' }
},
})
const publisher = new SqsPermissionPublisher(diContainer.cradle, {
locatorConfig: { queueUrl: consumer1.queueProps.url },
})

await publisher.publish({ id: '10', messageType: 'remove' })
// wait for consumer1 to start processing to start second consumer
await waitAndRetry(() => consumer1IsProcessing, 5, 5)
await consumer2.start()

// wait for both consumers to process message
await waitAndRetry(() => consumer1Counter > 0 && consumer2Counter > 0, 100, 40)

expect(consumer1Counter).toBe(1)
expect(consumer2Counter).toBe(heartbeatEnabled ? 0 : 1)
})
})
})
9 changes: 7 additions & 2 deletions packages/sqs/test/consumers/SqsPermissionConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@ type SupportedMessages = PERMISSIONS_ADD_MESSAGE_TYPE | PERMISSIONS_REMOVE_MESSA

type SqsPermissionConsumerOptions = Pick<
SQSConsumerOptions<SupportedMessages, ExecutionContext, PrehandlerOutput>,
'creationConfig' | 'locatorConfig' | 'logMessages' | 'deletionConfig' | 'deadLetterQueue'
| 'creationConfig'
| 'locatorConfig'
| 'logMessages'
| 'deletionConfig'
| 'deadLetterQueue'
| 'consumerOverrides'
> & {
addPreHandlerBarrier?: (
message: SupportedMessages,
Expand Down Expand Up @@ -87,7 +92,7 @@ export class SqsPermissionConsumer extends AbstractSqsConsumer<
deadLetterQueue: options.deadLetterQueue,
messageTypeField: 'messageType',
handlerSpy: true,
consumerOverrides: {
consumerOverrides: options.consumerOverrides ?? {
terminateVisibilityTimeout: true, // this allows to retry failed messages immediately
},
handlers: new MessageHandlerConfigBuilder<
Expand Down
Loading