Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AP-3648 SQS exponential delay between retries #150

Merged
merged 31 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
4d79570
Adding required methods to auto-filling internal props
CarlosGamero May 23, 2024
7ba3667
Adjusting sqs to new methods
CarlosGamero May 23, 2024
e12151f
Fixing mutation issue when adding new properties
CarlosGamero May 23, 2024
7846b7b
Mutation ssue fix + Sqs adjust
CarlosGamero May 23, 2024
c2d9273
Sns adjusted
CarlosGamero May 23, 2024
d0b4a47
Amqp adjusted
CarlosGamero May 23, 2024
08f2c04
Adding common delay calculation method
CarlosGamero May 23, 2024
b700281
Spy is using parsed message instead of original one
CarlosGamero May 23, 2024
2447665
Implementing delay on SQS
CarlosGamero May 23, 2024
5767360
Lint fixes
CarlosGamero May 23, 2024
ac03b1f
Fixing tests + adding TODO
CarlosGamero May 23, 2024
f0fffc3
Error fix
CarlosGamero May 23, 2024
38cc9a0
Test fixes
CarlosGamero May 23, 2024
851c7c4
And more test fixes
CarlosGamero May 23, 2024
a63f334
Testing exponential delay
CarlosGamero May 23, 2024
edf12c0
One more test fix
CarlosGamero May 23, 2024
3ba05da
Test [will be reverted]
CarlosGamero May 23, 2024
bf51f7d
Revert "Test [will be reverted]"
CarlosGamero May 23, 2024
2b953eb
Trying to fix tests on CI
CarlosGamero May 23, 2024
8c49c9d
Better test for exp delay
CarlosGamero May 23, 2024
d3a0fa8
Fixing issue on test
CarlosGamero May 23, 2024
c7b140e
Test fix
CarlosGamero May 23, 2024
cc06d95
Improving tests
CarlosGamero May 24, 2024
9e79981
Test fix
CarlosGamero May 24, 2024
ad0d266
SNS tests
CarlosGamero May 24, 2024
0e59805
Lint fixes
CarlosGamero May 25, 2024
e49731d
Merge branch 'main' into AP-3648_SQS_retry_delay
CarlosGamero May 25, 2024
a5233c0
Release prepare - minor version
CarlosGamero May 25, 2024
a6b619c
Merge branch 'main' into AP-3648_SQS_retry_delay
CarlosGamero May 26, 2024
8cdbfe7
Adding JSdoc to number of retries prop
CarlosGamero May 26, 2024
33dc101
Merge branch 'main' into AP-3648_SQS_retry_delay
CarlosGamero May 27, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions packages/amqp/lib/AbstractAmqpConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import type {
TransactionObservabilityManager,
} from '@message-queue-toolkit/core'
import {
isRetryDateExceeded,
isMessageError,
parseMessage,
HandlerContainer,
Expand Down Expand Up @@ -153,27 +152,26 @@ export abstract class AbstractAmqpConsumer<
.then((result) => {
if (result.result === 'success') {
this.channel.ack(message)
this.handleMessageProcessed(originalMessage, 'consumed')
this.handleMessageProcessed(parsedMessage, 'consumed')
Comment on lines -173 to +172
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to always use parsed message on spy as it can break tests using toEqual

return
}

// retryLater
const timestamp = this.tryToExtractTimestamp(originalMessage) ?? new Date()
// requeue the message if maxRetryDuration is not exceeded, else ack it to avoid infinite loop
if (!isRetryDateExceeded(timestamp, this.maxRetryDuration)) {
if (this.shouldBeRetried(originalMessage, this.maxRetryDuration)) {
// TODO: Add retry delay + republish message updating internal properties
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not implemented within this PR, adding todo

this.channel.nack(message, false, true)
this.handleMessageProcessed(originalMessage, 'retryLater')
this.handleMessageProcessed(parsedMessage, 'retryLater')
} else {
// ToDo move message to DLQ once it is implemented
this.channel.ack(message)
this.handleMessageProcessed(originalMessage, 'error')
this.handleMessageProcessed(parsedMessage, 'error')
}
})
.catch((err) => {
// ToDo we need sanity check to stop trying at some point, perhaps some kind of Redis counter
// If we fail due to unknown reason, let's retry
this.channel.nack(message, false, true)
this.handleMessageProcessed(originalMessage, 'retryLater')
this.handleMessageProcessed(parsedMessage, 'retryLater')
this.handleError(err)
})
.finally(() => {
Expand Down
12 changes: 2 additions & 10 deletions packages/amqp/lib/AbstractAmqpPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,22 +69,14 @@ export abstract class AbstractAmqpPublisher<MessagePayloadType extends object, M
return
}

/**
* If the message doesn't have a timestamp field -> add it
* will be used on the consumer to prevent infinite retries on the same message
*/
if (!this.tryToExtractTimestamp(message)) {
// @ts-ignore
message[this.messageTimestampField] = new Date().toISOString()
this.logger.warn(`${this.messageTimestampField} not defined, adding it automatically`)
}

if (this.logMessages) {
// @ts-ignore
const resolvedLogMessage = this.resolveMessageLog(message, message[this.messageTypeField])
this.logMessage(resolvedLogMessage)
}

message = this.updateInternalProperties(message)

try {
this.publishInternal(objectToBuffer(message), options)
} catch (err) {
Expand Down
4 changes: 2 additions & 2 deletions packages/amqp/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@message-queue-toolkit/amqp",
"version": "15.1.1",
"version": "15.2.0",
"private": false,
"license": "MIT",
"description": "AMQP adapter for message-queue-toolkit",
Expand Down Expand Up @@ -29,7 +29,7 @@
"zod": "^3.23.8"
},
"peerDependencies": {
"@message-queue-toolkit/core": "^13.3.1",
"@message-queue-toolkit/core": "^13.4.0",
"amqplib": "^0.10.3"
},
"devDependencies": {
Expand Down
17 changes: 8 additions & 9 deletions packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ import type { Dependencies } from '../utils/testContext'
import { registerDependencies, SINGLETON_CONFIG } from '../utils/testContext'

import { AmqpPermissionConsumer } from './AmqpPermissionConsumer'
import type { PERMISSIONS_MESSAGE_TYPE } from './userConsumerSchemas'
import type {
PERMISSIONS_ADD_MESSAGE_TYPE,
PERMISSIONS_REMOVE_MESSAGE_TYPE,
} from './userConsumerSchemas'

describe('AmqpPermissionConsumer', () => {
describe('init', () => {
Expand Down Expand Up @@ -90,15 +93,15 @@ describe('AmqpPermissionConsumer', () => {
expect(logger.loggedMessages.length).toBe(5)
expect(logger.loggedMessages).toEqual([
'Propagating new connection across 0 receivers',
'timestamp not defined, adding it automatically',
{
id: '1',
messageType: 'add',
timestamp: expect.any(String),
},
'timestamp not defined, adding it automatically',
{
id: '1',
messageType: 'add',
timestamp: expect.any(String),
},
{
messageId: '1',
Expand Down Expand Up @@ -407,11 +410,9 @@ describe('AmqpPermissionConsumer', () => {
})
await consumer.start()

const message: PERMISSIONS_MESSAGE_TYPE = {
const message: PERMISSIONS_ADD_MESSAGE_TYPE = {
id: '1',
messageType: 'add',
userIds: [1],
permissions: ['100'],
timestamp: new Date(new Date().getTime() - 2 * 1000).toISOString(),
}
publisher.publish(message)
Expand All @@ -432,11 +433,9 @@ describe('AmqpPermissionConsumer', () => {
})
await consumer.start()

const message: PERMISSIONS_MESSAGE_TYPE = {
const message: PERMISSIONS_REMOVE_MESSAGE_TYPE = {
id: '1',
messageType: 'remove',
userIds: [1],
permissions: ['100'],
timestamp: new Date(new Date().getTime() - 2 * 1000).toISOString(),
}
publisher.publish(message)
Expand Down
2 changes: 2 additions & 0 deletions packages/amqp/test/consumers/userConsumerSchemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ export const PERMISSIONS_MESSAGE_SCHEMA = z.object({
export const PERMISSIONS_ADD_MESSAGE_SCHEMA = z.object({
id: z.string(),
messageType: z.literal('add'),
timestamp: z.string().or(z.date()).optional(),
})

export const PERMISSIONS_REMOVE_MESSAGE_SCHEMA = z.object({
id: z.string(),
messageType: z.literal('remove'),
timestamp: z.string().or(z.date()).optional(),
})

export type PERMISSIONS_MESSAGE_TYPE = z.infer<typeof PERMISSIONS_MESSAGE_SCHEMA>
Expand Down
9 changes: 6 additions & 3 deletions packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,9 @@ describe('PermissionPublisher', () => {
return logger.loggedMessages.length === 2
})

expect(logger.loggedMessages[2]).toEqual({
expect(logger.loggedMessages[1]).toEqual({
id: '1',
messageType: 'add',
timestamp: expect.any(String),
})
})
})
Expand Down Expand Up @@ -245,6 +244,7 @@ describe('PermissionPublisher', () => {
userIds: [1],
permissions: ['100'],
timestamp: message.timestamp.toISOString(),
_internalNumberOfRetries: 0,
},
})
})
Expand Down Expand Up @@ -292,11 +292,12 @@ describe('PermissionPublisher', () => {
userIds: [1],
permissions: ['100'],
timestamp: message.timestamp.toISOString(),
_internalNumberOfRetries: 0,
},
})
})

it('publishes a message auto-filling timestamp', async () => {
it('publishes a message auto-filling internal properties', async () => {
await permissionConsumer.close()

const message = {
Expand Down Expand Up @@ -325,11 +326,13 @@ describe('PermissionPublisher', () => {
parsedMessage: {
id: '2',
messageType: 'add',
timestamp: expect.any(String),
},
originalMessage: {
id: '2',
messageType: 'add',
timestamp: expect.any(String),
_internalNumberOfRetries: 0,
},
})
})
Expand Down
59 changes: 57 additions & 2 deletions packages/core/lib/queues/AbstractQueueService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import type { ZodSchema, ZodType } from 'zod'
import type { MessageInvalidFormatError, MessageValidationError } from '../errors/Errors'
import type { Logger, MessageProcessingResult } from '../types/MessageQueueTypes'
import type { DeletionConfig, QueueDependencies, QueueOptions } from '../types/queueOptionsTypes'
import { isRetryDateExceeded } from '../utils/dateUtils'
import { toDatePreprocessor } from '../utils/toDateProcessor'

import type {
Expand Down Expand Up @@ -42,11 +43,15 @@ export abstract class AbstractQueueService<
ExecutionContext = undefined,
PrehandlerOutput = undefined,
> {
// Used to keep track of the number of retries performed on consumer
private readonly messageNumberOfRetriesField = '_internalNumberOfRetries'
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

retry naming sounds confusing for me in this context, because it's not clear how we are counting them - so first attempt is retry zero, and then second attempt is retry 1?

BullMQ naming of attempts is clearer for me; we can start at 1 (default when field is not set) and apply consistent counting logic.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me! will apply the change thanks Igor 🙇

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mmm, thinking about it, I am not really sure, this counting will only be applied on retryLater cases, meaning that errors will go with the SQS usual retry logic flow.

retry here means, the number of times the consumer finished with retryLater result, naming it attempt I think can be more confusing, maybe something _internalNumberOfRetryLater to make it more clear?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair point. maybe we can explain this with a jsdoc, without naming changes?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course! I will add it in a bit :D thanks Igor 🙏

// Used to avoid infinite retries on the same message
protected readonly messageTimestampField: string

protected readonly errorReporter: ErrorReporter
public readonly logger: Logger
protected readonly messageIdField: string
protected readonly messageTypeField: string
protected readonly messageTimestampField: string
protected readonly logMessages: boolean
protected readonly creationConfig?: QueueConfiguration
protected readonly locatorConfig?: QueueLocatorType
Expand Down Expand Up @@ -198,7 +203,45 @@ export abstract class AbstractQueueService<
return await barrier(message, executionContext, preHandlerOutput)
}

protected tryToExtractTimestamp(message: MessagePayloadSchemas): Date | undefined {
shouldBeRetried(message: MessagePayloadSchemas, maxRetryDuration: number): boolean {
const timestamp = this.tryToExtractTimestamp(message) ?? new Date()
return !isRetryDateExceeded(timestamp, maxRetryDuration)
}

protected getMessageRetryDelayInSeconds(message: MessagePayloadSchemas): number {
// if not defined, this is the first attempt
const retries = this.tryToExtractNumberOfRetries(message) ?? 0

// exponential backoff -> (2 ^ (attempts)) * delay
// delay = 1 second
return Math.pow(2, retries)
}

protected updateInternalProperties(message: MessagePayloadSchemas): MessagePayloadSchemas {
const messageCopy = { ...message } // clone the message to avoid mutation

/**
* If the message doesn't have a timestamp field -> add it
* will be used to prevent infinite retries on the same message
*/
if (!this.tryToExtractTimestamp(message)) {
// @ts-ignore
messageCopy[this.messageTimestampField] = new Date().toISOString()
this.logger.warn(`${this.messageTimestampField} not defined, adding it automatically`)
}

/**
* add/increment the number of retries performed to exponential message delay
*/
const numberOfRetries = this.tryToExtractNumberOfRetries(message)
// @ts-ignore
messageCopy[this.messageNumberOfRetriesField] =
numberOfRetries !== undefined ? numberOfRetries + 1 : 0

return messageCopy
}

private tryToExtractTimestamp(message: MessagePayloadSchemas): Date | undefined {
Comment on lines +212 to +250
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Common logic here

// @ts-ignore
if (this.messageTimestampField in message) {
// @ts-ignore
Expand All @@ -213,6 +256,18 @@ export abstract class AbstractQueueService<
return undefined
}

private tryToExtractNumberOfRetries(message: MessagePayloadSchemas): number | undefined {
if (
this.messageNumberOfRetriesField in message &&
typeof message[this.messageNumberOfRetriesField] === 'number'
) {
// @ts-ignore
return message[this.messageNumberOfRetriesField]
}

return undefined
}

protected abstract resolveNextFunction(
preHandlers: Prehandler<MessagePayloadSchemas, ExecutionContext, PrehandlerOutput>[],
message: MessagePayloadSchemas,
Expand Down
2 changes: 1 addition & 1 deletion packages/core/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@message-queue-toolkit/core",
"version": "13.3.2",
"version": "13.4.0",
"private": false,
"license": "MIT",
"description": "Useful utilities, interfaces and base classes for message queue handling. Supports AMQP and SQS with a common abstraction on top currently",
Expand Down
16 changes: 4 additions & 12 deletions packages/sns/lib/sns/AbstractSnsPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,32 +60,24 @@ export abstract class AbstractSnsPublisher<MessagePayloadType extends object>
}

try {
messageSchemaResult.result.parse(message)

/**
* If the message doesn't have a timestamp field -> add it
* will be used on the consumer to prevent infinite retries on the same message
*/
if (!this.tryToExtractTimestamp(message)) {
// @ts-ignore
message[this.messageTimestampField] = new Date().toISOString()
this.logger.warn(`${this.messageTimestampField} not defined, adding it automatically`)
}
const parsedMessage = messageSchemaResult.result.parse(message)

if (this.logMessages) {
// @ts-ignore
const resolvedLogMessage = this.resolveMessageLog(message, message[this.messageTypeField])
this.logMessage(resolvedLogMessage)
}

message = this.updateInternalProperties(message)

const input = {
Message: JSON.stringify(message),
TopicArn: this.topicArn,
...options,
} satisfies PublishCommandInput
const command = new PublishCommand(input)
await this.snsClient.send(command)
this.handleMessageProcessed(message, 'published')
this.handleMessageProcessed(parsedMessage, 'published')
} catch (error) {
const err = error as Error
this.handleError(err)
Expand Down
6 changes: 2 additions & 4 deletions packages/sns/lib/utils/snsMessageDeserializer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ describe('messageDeserializer', () => {
const messagePayload = {
id: '1',
messageType: 'add',
userIds: [1],
permissions: ['perm'],
nonSchemaField: 'nonSchemaField',
}
Expand Down Expand Up @@ -47,15 +46,14 @@ describe('messageDeserializer', () => {
parsedMessage: {
id: '1',
messageType: 'add',
userIds: [1],
permissions: ['perm'],
},
})
})

it('throws an error on invalid JSON', () => {
const messagePayload: Partial<PERMISSIONS_MESSAGE_TYPE> = {
userIds: [1],
permissions: ['perm'],
}

const snsMessage: SNS_MESSAGE_BODY_TYPE = {
Expand Down Expand Up @@ -89,7 +87,7 @@ describe('messageDeserializer', () => {

it('throws an error on invalid SNS envelope', () => {
const messagePayload: Partial<PERMISSIONS_MESSAGE_TYPE> = {
userIds: [1],
permissions: ['perm'],
}

const snsMessage: Partial<SNS_MESSAGE_BODY_TYPE> = {
Expand Down
6 changes: 3 additions & 3 deletions packages/sns/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@message-queue-toolkit/sns",
"version": "15.1.0",
"version": "15.2.0",
"private": false,
"license": "MIT",
"description": "SNS adapter for message-queue-toolkit",
Expand Down Expand Up @@ -32,8 +32,8 @@
"peerDependencies": {
"@aws-sdk/client-sns": "^3.556.0",
"@aws-sdk/client-sqs": "^3.556.0",
"@message-queue-toolkit/core": "^13.1.0",
"@message-queue-toolkit/sqs": "^15.0.0"
"@message-queue-toolkit/core": "^13.4.0",
"@message-queue-toolkit/sqs": "^15.1.0"
},
"devDependencies": {
"@aws-sdk/client-sns": "^3.569.0",
Expand Down
Loading
Loading