-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AP-3648 SQS exponential delay between retries #150
Conversation
this.handleMessageProcessed(originalMessage, 'consumed') | ||
this.handleMessageProcessed(parsedMessage, 'consumed') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to always use parsed message on spy as it can break tests using toEqual
// requeue the message if maxRetryDuration is not exceeded, else ack it to avoid infinite loop | ||
if (!isRetryDateExceeded(timestamp, this.maxRetryDuration)) { | ||
if (this.shouldBeRetried(originalMessage, this.maxRetryDuration)) { | ||
// TODO: Add retry delay + republish message updating internal properties |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not implemented within this PR, adding todo
shouldBeRetried(message: MessagePayloadSchemas, maxRetryDuration: number): boolean { | ||
const timestamp = this.tryToExtractTimestamp(message) ?? new Date() | ||
return !isRetryDateExceeded(timestamp, maxRetryDuration) | ||
} | ||
|
||
protected getMessageRetryDelayInSeconds(message: MessagePayloadSchemas): number { | ||
// if not defined, this is the first attempt | ||
const retries = this.tryToExtractNumberOfRetries(message) ?? 0 | ||
|
||
// exponential backoff -> (2 ^ (attempts)) * delay | ||
// delay = 1 second | ||
return Math.pow(2, retries) | ||
} | ||
|
||
protected updateInternalProperties(message: MessagePayloadSchemas): MessagePayloadSchemas { | ||
const messageCopy = { ...message } // clone the message to avoid mutation | ||
|
||
/** | ||
* If the message doesn't have a timestamp field -> add it | ||
* will be used to prevent infinite retries on the same message | ||
*/ | ||
if (!this.tryToExtractTimestamp(message)) { | ||
// @ts-ignore | ||
messageCopy[this.messageTimestampField] = new Date().toISOString() | ||
this.logger.warn(`${this.messageTimestampField} not defined, adding it automatically`) | ||
} | ||
|
||
/** | ||
* add/increment the number of retries performed to exponential message delay | ||
*/ | ||
const numberOfRetries = this.tryToExtractNumberOfRetries(message) | ||
// @ts-ignore | ||
messageCopy[this.messageNumberOfRetriesField] = | ||
numberOfRetries !== undefined ? numberOfRetries + 1 : 0 | ||
|
||
return messageCopy | ||
} | ||
|
||
private tryToExtractTimestamp(message: MessagePayloadSchemas): Date | undefined { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Common logic here
This reverts commit 3ba05da.
@@ -42,11 +43,15 @@ export abstract class AbstractQueueService< | |||
ExecutionContext = undefined, | |||
PrehandlerOutput = undefined, | |||
> { | |||
// Used to keep track of the number of retries performed on consumer | |||
private readonly messageNumberOfRetriesField = '_internalNumberOfRetries' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
retry
naming sounds confusing for me in this context, because it's not clear how we are counting them - so first attempt is retry zero, and then second attempt is retry 1?
BullMQ naming of attempts
is clearer for me; we can start at 1 (default when field is not set) and apply consistent counting logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me! will apply the change thanks Igor 🙇
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mmm, thinking about it, I am not really sure, this counting will only be applied on retryLater cases, meaning that errors will go with the SQS usual retry logic flow.
retry
here means, the number of times the consumer finished with retryLater
result, naming it attempt
I think can be more confusing, maybe something _internalNumberOfRetryLater
to make it more clear?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fair point. maybe we can explain this with a jsdoc, without naming changes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Of course! I will add it in a bit :D thanks Igor 🙏
# Conflicts: # packages/amqp/package.json
# Conflicts: # packages/amqp/package.json
No description provided.