diff --git a/README.md b/README.md index d721527c..9058a891 100644 --- a/README.md +++ b/README.md @@ -21,17 +21,20 @@ It consists of the following submodules: ### Publishers -`message-queue-toolkit` provides base classes for implementing publishers for each of the supported protocol. +`message-queue-toolkit` provides base classes for implementing publishers for each of the supported protocols. They implement the following public methods: * `constructor()`, which accepts the following parameters: * `dependencies` – a set of dependencies depending on the protocol; * `options`, composed by * `messageSchemas` – the `zod` schemas for all supported messages; + * `messageTimestampField` - which field in the message contains the message creation date (by default it is `timestamp`). This field needs to be a `Date` object or ISO-8601 date string, if your message doesn't contain it the library will add one automatically to avoid infinite loops on consumer; * `messageTypeField` - which field in the message describes the type of a message. This field needs to be defined as `z.literal` in the schema and is used for resolving the correct schema for validation * `locatorConfig` - configuration for resolving existing queue and/or topic. Should not be specified together with the `creationConfig`. - * `creationConfig` - configuration for queue and/or topic to create, if one does not exist. Should not be specified together with the `locatorConfig`. - * `messageTimestampField` - which field in the message contains the message creation date (by default it is `timestamp`). This field needs to be a `Date` object or ISO-8601 date string, if your message doesn't contain it the library will add one automatically to avoid infinite loops on consumer; + * `creationConfig` - configuration for queue and/or topic to create, if one does not exist. Should not be specified together with the `locatorConfig`; + * `deletionConfig` - automatic cleanup of resources; + * `handlerSpy` - allow awaiting certain messages to be published (see [Handler Spies](#handler-spies) for more information); + * `logMessages` - add logs for processed messages. * `init()`, prepare publisher for use (e. g. establish all necessary connections); * `close()`, stop publisher use (e. g. disconnect); * `publish()`, send a message to a queue or topic. It accepts the following parameters: @@ -42,7 +45,7 @@ They implement the following public methods: ### Consumers -`message-queue-toolkit` provides base classes for implementing consumers for each of the supported protocol. +`message-queue-toolkit` provides base classes for implementing consumers for each of the supported protocols. They expose the following public methods: Multi-schema consumers support multiple message types via handler configs. They expose the following public methods: @@ -58,9 +61,11 @@ Multi-schema consumers support multiple message types via handler configs. They * `locatorConfig` - configuration for resolving existing queue and/or topic. Should not be specified together with the `creationConfig`. * `creationConfig` - configuration for queue and/or topic to create, if one does not exist. Should not be specified together with the `locatorConfig`. * `subscriptionConfig` - SNS SQS consumer only - configuration for SNS -> SQS subscription to create, if one doesn't exist. + * `deletionConfig` - automatic cleanup of resources; * `consumerOverrides` – available only for SQS consumers; - * `deadLetterQueue` - available only for SQS and SNS consumers; please read the section below to understand how to use it. - * `subscribedToTopic` – parameters for a topic to use during creation if it does not exist. Ignored if `queueLocator.subscriptionArn` is set. Available only for SNS consumers; + * `deadLetterQueue` - available only for SQS and SNS consumers (see [Dead Letter Queue](#dead-letter-queue) for more information); + * `handlerSpy` - allow awaiting certain messages to be published (see [Handler Spies](#handler-spies) for more information); + * `logMessages` - add logs for processed messages. * `init()`, prepare consumer for use (e. g. establish all necessary connections); * `close()`, stop listening for messages and disconnect; * `start()`, which invokes `init()`. @@ -68,7 +73,7 @@ Multi-schema consumers support multiple message types via handler configs. They > **_NOTE:_** See [SqsPermissionConsumer.ts](./packages/sqs/test/consumers/SqsPermissionConsumer.ts) for a practical example. -##### How to define a handler +#### Handlers You can define handlers for each of the supported messages in a type-safe way using the MessageHandlerConfigBuilder. @@ -80,9 +85,10 @@ type ExecutionContext = { userService: UserService } -export class TestConsumerMultiSchema extends AbstractSqsConsumerMultiSchema< +export class SqsPermissionConsumer extends AbstractSqsConsumer< SupportedMessages, - ExecutionContext + ExecutionContext, + PreHandlerContext > { constructor( dependencies: SQSConsumerDependencies, @@ -94,7 +100,8 @@ export class TestConsumerMultiSchema extends AbstractSqsConsumerMultiSchema< // handlers: new MessageHandlerConfigBuilder< SupportedMessages, - ExecutionContext + ExecutionContext, + PreHandlerContext >() .addConfig( PERMISSIONS_ADD_MESSAGE_SCHEMA, @@ -149,7 +156,7 @@ Then the message is automatically nacked without requeueing by the abstract cons > **_NOTE:_** See [userConsumerSchemas.ts](./packages/sqs/test/consumers/userConsumerSchemas.ts) and [SqsPermissionsConsumer.spec.ts](./packages/sqs/test/consumers/SqsPermissionsConsumer.spec.ts) for a practical example. -### Barrier pattern +### Barrier Pattern The barrier pattern facilitates the out-of-order message handling by retrying the message later if the system is not yet in the proper state to be able to process that message (e. g. some prerequisite messages have not yet arrived). To enable this pattern you should define `preHandlerBarrier` on your message handler in order to define the conditions for starting to process the message. @@ -182,9 +189,9 @@ Both publishers and consumers accept a queue name and configuration as parameter If you do not want to create a new queue/topic, you can set `queueLocator` field for `queueConfiguration`. In that case `message-queue-toolkit` will not attempt to create a new queue or topic, and instead throw an error if they don't already exist. -## Handler spies +## Handler Spies -In certain cases you want to await until certain publisher publishes a message, or a certain handler consumes a message. For that you can use handler spy functionality, built into message-queue-toolkit directly. +In certain cases you want to await until certain publisher publishes a message, or a certain handler consumes a message. For that you can use handler spy functionality, built into `message-queue-toolkit` directly. In order to enable this functionality, configure spyHandler on the publisher or consumer: @@ -235,51 +242,3 @@ const result = await myConsumer.handlerSpy.waitForMessageWithId('1') expect(result.processingResult).toEqual('consumed') ``` -## Automatic Reconnects (RabbitMQ) - -`message-queue-toolkit` automatically reestablishes connections for all publishers and consumers via `AmqpConnectionManager` mechanism. - -Example: - -```ts -export const TEST_AMQP_CONFIG: AmqpConfig = { - vhost: '', - hostname: 'localhost', - username: 'guest', - password: 'guest', - port: 5672, - useTls: false, -} - -const amqpConnectionManager = new AmqpConnectionManager(config, logger) -await amqpConnectionManager.init() - -const publisher = new TestAmqpPublisher( - { amqpConnectionManager }, - { - /// other amqp options - }) -await publisher.init() - -const consumer = new TestAmqpConsumer( - { amqpConnectionManager }, - { - /// other amqp options - }) -await consumer.start() - -// break connection, to simulate unexpected disconnection in production -await (await amqpConnectionManager.getConnection()).close() - -const message = { - // some test message -} - -// This will fail, but will trigger reconnection within amqpConnectionManager -publisher.publish(message) - -// eventually connection is reestablished and propagated across all the AMQP services that use same amqpConnectionManager - -// This will succeed and consumer, which also received new connection, will be able to consume it -publisher.publish(message) -``` diff --git a/packages/amqp/README.md b/packages/amqp/README.md new file mode 100644 index 00000000..7d1b9abc --- /dev/null +++ b/packages/amqp/README.md @@ -0,0 +1,67 @@ +# AMQP (Advanced Message Queuing Protocol) + +The library provides support for both direct exchanges and topic exchanges. + +> **_NOTE:_** Check [README.md](../../README.md) for transport-agnostic library documentation. +> + +## Publishers + +Use `AbstractAmqpQueuePublisher` to implement direct exchange and `AbstractAmqpTopicPublisher` for topic exchange. + +See [test publisher](test/publishers/AmqpPermissionPublisher.ts) for an example of implementation. + +## Consumers + +Use `AbstractAmqpQueueConsumer` to implement direct exchange and `AbstractAmqpTopicConsumer` for topic exchange. + +See [test consumer](test/consumers/AmqpPermissionConsumer.ts) for an example of implementation. + +## Automatic Reconnects + +`message-queue-toolkit` automatically reestablishes connections for all publishers and consumers via `AmqpConnectionManager` mechanism. + +Example: + +```ts +export const TEST_AMQP_CONFIG: AmqpConfig = { + vhost: '', + hostname: 'localhost', + username: 'guest', + password: 'guest', + port: 5672, + useTls: false, +} + +const amqpConnectionManager = new AmqpConnectionManager(config, logger) +await amqpConnectionManager.init() + +const publisher = new TestAmqpPublisher( + { amqpConnectionManager }, + { + // other amqp options + }) +await publisher.init() + +const consumer = new TestAmqpConsumer( + { amqpConnectionManager }, + { + // other amqp options + }) +await consumer.start() + +// break connection, to simulate unexpected disconnection in production +await (await amqpConnectionManager.getConnection()).close() + +const message = { + // some test message +} + +// This will fail, but will trigger reconnection within amqpConnectionManager +publisher.publish(message) + +// eventually connection is reestablished and propagated across all the AMQP services that use same amqpConnectionManager + +// This will succeed and consumer, which also received new connection, will be able to consume it +publisher.publish(message) +```