Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dead letter queue #117

Merged
merged 33 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
8e975b5
Minor dependency fixes
CarlosGamero Apr 1, 2024
cad4200
Import fixes
CarlosGamero Apr 1, 2024
00ea47b
dead letter queue param types
CarlosGamero Apr 1, 2024
0ab0d13
Minor test improvement
CarlosGamero Apr 1, 2024
f064a5d
Preparing for TDD
CarlosGamero Apr 1, 2024
9f76454
DLQ first version
CarlosGamero Apr 1, 2024
cebd9b5
Adding some tests to DLQ init
CarlosGamero Apr 1, 2024
8629d1b
DLQ deletion
CarlosGamero Apr 1, 2024
d6afb0f
DLQ tests moved to a new file
CarlosGamero Apr 1, 2024
a99b532
Implementing failProcessing + test
CarlosGamero Apr 1, 2024
9e771bb
Minor changes
CarlosGamero Apr 1, 2024
290952e
Adding TDD test case
CarlosGamero Apr 1, 2024
80375e9
Fixing build
CarlosGamero Apr 1, 2024
0ad53c3
Fix build
CarlosGamero Apr 1, 2024
626039c
Fixing test
CarlosGamero Apr 1, 2024
0f87b23
Improving dlq creation
CarlosGamero Apr 1, 2024
70c2544
Removing DLQ default name
CarlosGamero Apr 1, 2024
4417dda
Fixing issue with DLQ deletion config
CarlosGamero Apr 1, 2024
0d77e8b
SQS fixing retryLater
CarlosGamero Apr 2, 2024
5bf6527
Moving DLQ types to core
CarlosGamero Apr 2, 2024
0024919
core release prepare
CarlosGamero Apr 2, 2024
5ec0dc2
sqs release prepare
CarlosGamero Apr 2, 2024
a44372a
SNS release prepare
CarlosGamero Apr 2, 2024
c0d8ce8
amqp handling dlq param + tests
CarlosGamero Apr 2, 2024
3021041
amqp prepare for release
CarlosGamero Apr 2, 2024
2d0e67a
Adding doc link
CarlosGamero Apr 2, 2024
ce2145d
Adding doc
CarlosGamero Apr 2, 2024
4dec7a7
Fixing minor doc issue
CarlosGamero Apr 2, 2024
1b0d2cd
Improving doc
CarlosGamero Apr 2, 2024
bc04b4b
Extracting SQS DLD params from common type
CarlosGamero Apr 2, 2024
0c86f93
Fix test
CarlosGamero Apr 2, 2024
8008b41
Improving doc
CarlosGamero Apr 2, 2024
0c15baf
Typos
CarlosGamero Apr 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions packages/sqs/index.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
export type {
SQSConsumerDependencies,
SQSQueueLocatorType,
ExtraSQSCreationParams,
SQSDependencies,
SQSCreationConfig,
SQSQueueLocatorType,
} from './lib/sqs/AbstractSqsService'

export * from './lib/sqs/AbstractSqsConsumer'
export { SqsConsumerErrorResolver } from './lib/errors/SqsConsumerErrorResolver'

export type { SQSConsumerDependencies, SQSConsumerOptions } from './lib/sqs/AbstractSqsConsumer'
export { AbstractSqsConsumer } from './lib/sqs/AbstractSqsConsumer'

export { AbstractSqsPublisher } from './lib/sqs/AbstractSqsPublisher'
export type { SQSMessageOptions } from './lib/sqs/AbstractSqsPublisher'

Expand Down
124 changes: 110 additions & 14 deletions packages/sqs/lib/sqs/AbstractSqsConsumer.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { CreateQueueRequest } from '@aws-sdk/client-sqs'
import { SendMessageCommand, SetQueueAttributesCommand } from '@aws-sdk/client-sqs'
import type { Either, ErrorResolver } from '@lokalise/node-core'
import type {
QueueConsumer as QueueConsumer,
Expand All @@ -7,6 +7,7 @@ import type {
PrehandlingOutputs,
Prehandler,
BarrierResult,
QueueConsumerDependencies,
} from '@message-queue-toolkit/core'
import {
isMessageError,
Expand All @@ -18,24 +19,51 @@ import { Consumer } from 'sqs-consumer'
import type { ConsumerOptions } from 'sqs-consumer/src/types'

import type { SQSMessage } from '../types/MessageTypes'
import { deleteSqs, initSqs } from '../utils/sqsInitter'
import { readSqsMessage } from '../utils/sqsMessageReader'

import type { SQSConsumerDependencies, SQSQueueLocatorType } from './AbstractSqsService'
import type { SQSCreationConfig, SQSDependencies, SQSQueueLocatorType } from './AbstractSqsService'
import { AbstractSqsService } from './AbstractSqsService'

const ABORT_EARLY_EITHER: Either<'abort', never> = {
error: 'abort',
}

export type ExtraSQSCreationParams = {
topicArnsWithPublishPermissionsPrefix?: string
updateAttributesIfExists?: boolean
// TODO: should we include DLQ types on core?
type DeadLetterQueueCreationConfig<CreationConfigType extends SQSCreationConfig> = Omit<
CreationConfigType,
'queue'
> & {
queue: Omit<CreationConfigType['queue'], 'QueueName'> &
(
| {
queueNameSuffix?: string
QueueName?: never
}
| {
queueNameSuffix?: never
QueueName?: string
}
)
}

CarlosGamero marked this conversation as resolved.
Show resolved Hide resolved
export type SQSCreationConfig = {
queue: CreateQueueRequest
updateAttributesIfExists?: boolean
} & ExtraSQSCreationParams
type DeadLetterQueueOptions<
CreationConfigType extends SQSCreationConfig,
QueueLocatorType extends SQSQueueLocatorType,
> = {
redrivePolicy: { maxReceiveCount: number }
} & (
| {
creationConfig?: DeadLetterQueueCreationConfig<CreationConfigType>
locatorConfig?: never
}
| {
creationConfig?: never
locatorConfig?: QueueLocatorType
}
)

export type SQSConsumerDependencies = SQSDependencies & QueueConsumerDependencies

export type SQSConsumerOptions<
MessagePayloadSchemas extends object,
Expand All @@ -51,6 +79,7 @@ export type SQSConsumerOptions<
PrehandlerOutput
> & {
consumerOverrides?: Partial<ConsumerOptions>
deadLetterQueue?: DeadLetterQueueOptions<CreationConfigType, QueueLocatorType>
}
export abstract class AbstractSqsConsumer<
MessagePayloadType extends object,
Expand Down Expand Up @@ -97,6 +126,13 @@ export abstract class AbstractSqsConsumer<
protected readonly messageSchemaContainer: MessageSchemaContainer<MessagePayloadType>
protected readonly executionContext: ExecutionContext

private readonly deadLetterQueueOptions?: DeadLetterQueueOptions<
CreationConfigType,
QueueLocatorType
>

protected deadLetterQueueUrl?: string

protected constructor(
dependencies: SQSConsumerDependencies,
options: ConsumerOptionsType,
Expand All @@ -107,8 +143,9 @@ export abstract class AbstractSqsConsumer<
this.errorResolver = dependencies.consumerErrorResolver

this.consumerOptionsOverride = options.consumerOverrides ?? {}
const messageSchemas = options.handlers.map((entry) => entry.schema)
this.deadLetterQueueOptions = options.deadLetterQueue

const messageSchemas = options.handlers.map((entry) => entry.schema)
this.messageSchemaContainer = new MessageSchemaContainer<MessagePayloadType>({
messageSchemas,
messageTypeField: options.messageTypeField,
Expand All @@ -124,6 +161,61 @@ export abstract class AbstractSqsConsumer<
this.executionContext = executionContext
}

override async init(): Promise<void> {
await super.init()
await this.initDeadLetterQueue()
}

private async initDeadLetterQueue() {
if (!this.deadLetterQueueOptions) return

const dlqCreationConfig = this.resolvedDlqCreationConfig(this.deadLetterQueueOptions)
if (this.deletionConfig && dlqCreationConfig) {
await deleteSqs(this.sqsClient, this.deletionConfig, dlqCreationConfig)
}

const result = await initSqs(
this.sqsClient,
this.deadLetterQueueOptions.locatorConfig,
dlqCreationConfig,
)
const updateAttrCommand = new SetQueueAttributesCommand({
QueueUrl: this.queueUrl,
Attributes: {
RedrivePolicy: JSON.stringify({
deadLetterTargetArn: result.queueArn,
maxReceiveCount: this.deadLetterQueueOptions?.redrivePolicy.maxReceiveCount,
}),
},
})
await this.sqsClient.send(updateAttrCommand)

this.deadLetterQueueUrl = result.queueUrl
}

private resolvedDlqCreationConfig(
deadLetterQueueOptions: DeadLetterQueueOptions<CreationConfigType, QueueLocatorType>,
): SQSCreationConfig | undefined {
if (deadLetterQueueOptions.locatorConfig) return undefined

let dlqName
if (deadLetterQueueOptions.creationConfig?.queue.QueueName) {
dlqName = deadLetterQueueOptions.creationConfig.queue.QueueName
} else if (deadLetterQueueOptions.creationConfig?.queue.queueNameSuffix) {
dlqName = `${this.queueName}${deadLetterQueueOptions.creationConfig.queue.queueNameSuffix}`
} else {
dlqName = `${this.queueName}-dlq`
}

return {
...deadLetterQueueOptions.creationConfig,
queue: {
...deadLetterQueueOptions.creationConfig?.queue,
QueueName: dlqName,
},
}
}

public async start() {
await this.init()

Expand All @@ -133,7 +225,6 @@ export abstract class AbstractSqsConsumer<
this.consumer = Consumer.create({
queueUrl: this.queueUrl,
handleMessage: async (message: SQSMessage) => {
/* c8 ignore next */
if (message === null) return

const deserializedMessage = this.deserializeMessage(message)
Expand Down Expand Up @@ -295,7 +386,6 @@ export abstract class AbstractSqsConsumer<
return ABORT_EARLY_EITHER
}
// Empty content for whatever reason
/* c8 ignore next */
if (!resolveMessageResult.result) return ABORT_EARLY_EITHER

// @ts-ignore
Expand Down Expand Up @@ -352,7 +442,13 @@ export abstract class AbstractSqsConsumer<
}
}

private async failProcessing(_message: SQSMessage) {
// Not implemented yet - needs dead letter queue
private async failProcessing(message: SQSMessage) {
if (!this.deadLetterQueueUrl) return

const command = new SendMessageCommand({
QueueUrl: this.deadLetterQueueUrl,
MessageBody: message.Body,
})
await this.sqsClient.send(command)
}
}
3 changes: 1 addition & 2 deletions packages/sqs/lib/sqs/AbstractSqsPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@ import type { ZodSchema } from 'zod'

import type { SQSMessage } from '../types/MessageTypes'

import type { SQSCreationConfig } from './AbstractSqsConsumer'
import { AbstractSqsService } from './AbstractSqsService'
import type { SQSDependencies, SQSQueueLocatorType } from './AbstractSqsService'
import type { SQSDependencies, SQSQueueLocatorType, SQSCreationConfig } from './AbstractSqsService'

export type SQSMessageOptions = {
MessageGroupId?: string
Expand Down
20 changes: 11 additions & 9 deletions packages/sqs/lib/sqs/AbstractSqsService.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
import type { SQSClient } from '@aws-sdk/client-sqs'
import type {
QueueConsumerDependencies,
QueueDependencies,
QueueOptions,
} from '@message-queue-toolkit/core'
import type { CreateQueueRequest, SQSClient } from '@aws-sdk/client-sqs'
import type { QueueDependencies, QueueOptions } from '@message-queue-toolkit/core'
import { AbstractQueueService } from '@message-queue-toolkit/core'

import type { SQSMessage } from '../types/MessageTypes'
import { deleteSqs, initSqs } from '../utils/sqsInitter'

import type { SQSCreationConfig } from './AbstractSqsConsumer'

export type SQSDependencies = QueueDependencies & {
sqsClient: SQSClient
}

export type SQSConsumerDependencies = SQSDependencies & QueueConsumerDependencies
export type ExtraSQSCreationParams = {
topicArnsWithPublishPermissionsPrefix?: string
updateAttributesIfExists?: boolean
}

export type SQSCreationConfig = {
queue: CreateQueueRequest
updateAttributesIfExists?: boolean
} & ExtraSQSCreationParams

export type SQSQueueLocatorType = {
queueUrl: string
Expand Down
3 changes: 1 addition & 2 deletions packages/sqs/lib/utils/sqsInitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ import type { SQSClient, QueueAttributeName } from '@aws-sdk/client-sqs'
import type { DeletionConfig } from '@message-queue-toolkit/core'
import { isProduction } from '@message-queue-toolkit/core'

import type { SQSCreationConfig } from '../sqs/AbstractSqsConsumer'
import type { SQSQueueLocatorType } from '../sqs/AbstractSqsService'
import type { SQSCreationConfig, SQSQueueLocatorType } from '../sqs/AbstractSqsService'

import { assertQueue, deleteQueue, getQueueAttributes } from './sqsUtils'

Expand Down
3 changes: 1 addition & 2 deletions packages/sqs/lib/utils/sqsUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ import type { CreateQueueCommandInput, SQSClient, QueueAttributeName } from '@aw
import type { Either } from '@lokalise/node-core'
import { isShallowSubset, waitAndRetry } from '@message-queue-toolkit/core'

import type { ExtraSQSCreationParams } from '../sqs/AbstractSqsConsumer'
import type { SQSQueueLocatorType } from '../sqs/AbstractSqsService'
import type { ExtraSQSCreationParams, SQSQueueLocatorType } from '../sqs/AbstractSqsService'

import { generateQueuePublishForTopicPolicy } from './sqsAttributeUtils'
import { updateQueueAttributes } from './sqsInitter'
Expand Down
25 changes: 14 additions & 11 deletions packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@ import { SqsPermissionConsumer } from './SqsPermissionConsumer'

describe('SqsPermissionConsumer', () => {
describe('init', () => {
const queueName = 'myTestQueue'

let diContainer: AwilixContainer<Dependencies>
let sqsClient: SQSClient

beforeEach(async () => {
diContainer = await registerDependencies()
sqsClient = diContainer.cradle.sqsClient
await deleteQueue(sqsClient, 'existingQueue')
await deleteQueue(sqsClient, queueName)
})

afterEach(async () => {
Expand All @@ -34,7 +37,7 @@ describe('SqsPermissionConsumer', () => {
it('throws an error when invalid queue locator is passed', async () => {
const newConsumer = new SqsPermissionConsumer(diContainer.cradle, {
locatorConfig: {
queueUrl: 'http://s3.localhost.localstack.cloud:4566/000000000000/existingQueue',
queueUrl: `http://s3.localhost.localstack.cloud:4566/000000000000/${queueName}`,
},
})

Expand All @@ -43,24 +46,24 @@ describe('SqsPermissionConsumer', () => {

it('does not create a new queue when queue locator is passed', async () => {
await assertQueue(sqsClient, {
QueueName: 'existingQueue',
QueueName: queueName,
})

const newConsumer = new SqsPermissionConsumer(diContainer.cradle, {
locatorConfig: {
queueUrl: 'http://s3.localhost.localstack.cloud:4566/000000000000/existingQueue',
queueUrl: `http://s3.localhost.localstack.cloud:4566/000000000000/${queueName}`,
},
})

await newConsumer.init()
expect(newConsumer.queueProps.url).toBe(
'http://s3.localhost.localstack.cloud:4566/000000000000/existingQueue',
`http://s3.localhost.localstack.cloud:4566/000000000000/${queueName}`,
)
})

it('updates existing queue when one with different attributes exist', async () => {
await assertQueue(sqsClient, {
QueueName: 'existingQueue',
QueueName: queueName,
Attributes: {
KmsMasterKeyId: 'somevalue',
},
Expand All @@ -69,7 +72,7 @@ describe('SqsPermissionConsumer', () => {
const newConsumer = new SqsPermissionConsumer(diContainer.cradle, {
creationConfig: {
queue: {
QueueName: 'existingQueue',
QueueName: queueName,
Attributes: {
KmsMasterKeyId: 'othervalue',
},
Expand All @@ -86,7 +89,7 @@ describe('SqsPermissionConsumer', () => {

await newConsumer.init()
expect(newConsumer.queueProps.url).toBe(
'http://sqs.eu-west-1.localstack:4566/000000000000/existingQueue',
`http://sqs.eu-west-1.localstack:4566/000000000000/${queueName}`,
)

const updateCall = sqsSpy.mock.calls.find((entry) => {
Expand All @@ -103,7 +106,7 @@ describe('SqsPermissionConsumer', () => {

it('does not update existing queue when attributes did not change', async () => {
await assertQueue(sqsClient, {
QueueName: 'existingQueue',
QueueName: queueName,
Attributes: {
KmsMasterKeyId: 'somevalue',
},
Expand All @@ -112,7 +115,7 @@ describe('SqsPermissionConsumer', () => {
const newConsumer = new SqsPermissionConsumer(diContainer.cradle, {
creationConfig: {
queue: {
QueueName: 'existingQueue',
QueueName: queueName,
Attributes: {
KmsMasterKeyId: 'somevalue',
},
Expand All @@ -129,7 +132,7 @@ describe('SqsPermissionConsumer', () => {

await newConsumer.init()
expect(newConsumer.queueProps.url).toBe(
'http://sqs.eu-west-1.localstack:4566/000000000000/existingQueue',
`http://sqs.eu-west-1.localstack:4566/000000000000/${queueName}`,
)

const updateCall = sqsSpy.mock.calls.find((entry) => {
Expand Down
Loading
Loading