-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dead letter queue #117
Dead letter queue #117
Conversation
"version": "12.0.1", | ||
"version": "12.0.2", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if patch
or minor
, the new parameter is there but it can't be used so I guess it is a patch
@@ -15,6 +15,52 @@ import { registerDependencies, SINGLETON_CONFIG } from '../utils/testContext' | |||
import { AmqpPermissionConsumer } from './AmqpPermissionConsumer' | |||
|
|||
describe('AmqpPermissionConsumer', () => { | |||
describe('init', () => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding some tests to make sure unsupported options are throwing an error and not silently accepted
} & CommonQueueOptions | ||
} | ||
|
||
type ExistingQueueOptions<QueueLocatorType extends object> = { | ||
locatorConfig: QueueLocatorType | ||
deletionConfig?: DeletionConfig | ||
creationConfig?: never | ||
} & CommonQueueOptions | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CommonQueueOptions
is already being used on QueueOptions
, so avoiding repetition
export type ExtraSQSCreationParams = { | ||
topicArnsWithPublishPermissionsPrefix?: string | ||
updateAttributesIfExists?: boolean | ||
} | ||
|
||
export type SQSCreationConfig = { | ||
queue: CreateQueueRequest | ||
updateAttributesIfExists?: boolean | ||
} & ExtraSQSCreationParams |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moving them to service as they are common for publisher and consumer
// ToDo we need sanity check to stop trying at some point, perhaps some kind of Redis counter | ||
// If we fail due to unknown reason, let's retry |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess DLQ is addressing this TODO
To create a dead letter queue, you need to specify the `deadLetterQueue` parameter within options on the consumer configuration. The parameter should contain the following fields: | ||
- `creationConfig`: configuration for queue to create, if one does not exist. Should not be specified together with the `locatorConfig` | ||
- `locatorConfig`: configuration for resolving existing queue. Should not be specified together with the `creationConfig` | ||
- `redrivePolicy`: an object that contains the following fields: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is very SQS-specific, I wonder if we should have a higher-level concept on top of it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will make it more generic once we address the comment below, thanks Igor 🙇
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a note a top explaining DLQ support, so I guess it is fine to keep this coupled to SQS for now, please let me know if you disagree
README.md
Outdated
> **_NOTE:_** DLQ is not available for AMQP consumers. | ||
|
||
> **_NOTE:_** DLQ could produce message duplication if used together with the barrier pattern (due to the | ||
retry mechanism), although this is unlikely to happen in practice. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would clarify it - if a service crashes after receiving retryLater result from a barrier and creating a new message for the retry, but before the original one is consumed, there will be a duplicate, because those are not being done as an atomary operation.
It would also be helpful to clarify how the barrier works when used together with DLQ, that we do not simply return message to the queue, but create a new one instead, in order to avoid exhausting DLQ limits early.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, I will apply your suggestions to the node
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, please have a look if it is clearer now 🙏
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great job, Carlos, this is epic!
A dead letter queue is a queue where messages that cannot be processed successfully are stored. It serves as a holding area for messages that have encountered errors or exceptions during processing, allowing developers to review and handle them later. To create a dead letter queue, you need to specify the
deadLetterQueue
parameter withinoptions
on the consumer configuration. The parameter should contain the following fields:creationConfig
: configuration for the queue to create, if one does not exist. Should not be specified together with thelocatorConfig
locatorConfig
: configuration for resolving existing queue. Should not be specified together with thecreationConfig
redrivePolicy
: an object that contains the following fields:maxReceiveCount
: the number of times a message can be received before being moved to the DLQ.Note: AMQP does not support DLQ at this point, this will be handled in a future version.
Known issue
If you use the barrier pattern together with a dead letter queue, you should be aware that the library does not simply return the message to the queue, but creates a new one instead to avoid exhausting DLQ limits early.
Due to this fact, in those cases, you could have duplicated messages if a service crashes after receiving
retryLater
result from the barrier and creating a new message for the retry, but before the original one is consumed (those are not being done as an atomic operation).In a future release, we will implement deduplication to remove this risk but in the meantime, please keep this in mind.