Skip to content

Commit

Permalink
MET-32 Update docs (#149)
Browse files Browse the repository at this point in the history
* MET-32 Update docs

* MET-32 Adjust README
  • Loading branch information
dariacm authored May 28, 2024
1 parent f2e9571 commit 80a5912
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 61 deletions.
81 changes: 20 additions & 61 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,20 @@ It consists of the following submodules:

### Publishers

`message-queue-toolkit` provides base classes for implementing publishers for each of the supported protocol.
`message-queue-toolkit` provides base classes for implementing publishers for each of the supported protocols.
They implement the following public methods:

* `constructor()`, which accepts the following parameters:
* `dependencies` – a set of dependencies depending on the protocol;
* `options`, composed by
* `messageSchemas` – the `zod` schemas for all supported messages;
* `messageTimestampField` - which field in the message contains the message creation date (by default it is `timestamp`). This field needs to be a `Date` object or ISO-8601 date string, if your message doesn't contain it the library will add one automatically to avoid infinite loops on consumer;
* `messageTypeField` - which field in the message describes the type of a message. This field needs to be defined as `z.literal` in the schema and is used for resolving the correct schema for validation
* `locatorConfig` - configuration for resolving existing queue and/or topic. Should not be specified together with the `creationConfig`.
* `creationConfig` - configuration for queue and/or topic to create, if one does not exist. Should not be specified together with the `locatorConfig`.
* `messageTimestampField` - which field in the message contains the message creation date (by default it is `timestamp`). This field needs to be a `Date` object or ISO-8601 date string, if your message doesn't contain it the library will add one automatically to avoid infinite loops on consumer;
* `creationConfig` - configuration for queue and/or topic to create, if one does not exist. Should not be specified together with the `locatorConfig`;
* `deletionConfig` - automatic cleanup of resources;
* `handlerSpy` - allow awaiting certain messages to be published (see [Handler Spies](#handler-spies) for more information);
* `logMessages` - add logs for processed messages.
* `init()`, prepare publisher for use (e. g. establish all necessary connections);
* `close()`, stop publisher use (e. g. disconnect);
* `publish()`, send a message to a queue or topic. It accepts the following parameters:
Expand All @@ -42,7 +45,7 @@ They implement the following public methods:
### Consumers

`message-queue-toolkit` provides base classes for implementing consumers for each of the supported protocol.
`message-queue-toolkit` provides base classes for implementing consumers for each of the supported protocols.
They expose the following public methods:

Multi-schema consumers support multiple message types via handler configs. They expose the following public methods:
Expand All @@ -58,17 +61,19 @@ Multi-schema consumers support multiple message types via handler configs. They
* `locatorConfig` - configuration for resolving existing queue and/or topic. Should not be specified together with the `creationConfig`.
* `creationConfig` - configuration for queue and/or topic to create, if one does not exist. Should not be specified together with the `locatorConfig`.
* `subscriptionConfig` - SNS SQS consumer only - configuration for SNS -> SQS subscription to create, if one doesn't exist.
* `deletionConfig` - automatic cleanup of resources;
* `consumerOverrides` – available only for SQS consumers;
* `deadLetterQueue` - available only for SQS and SNS consumers; please read the section below to understand how to use it.
* `subscribedToTopic` – parameters for a topic to use during creation if it does not exist. Ignored if `queueLocator.subscriptionArn` is set. Available only for SNS consumers;
* `deadLetterQueue` - available only for SQS and SNS consumers (see [Dead Letter Queue](#dead-letter-queue) for more information);
* `handlerSpy` - allow awaiting certain messages to be published (see [Handler Spies](#handler-spies) for more information);
* `logMessages` - add logs for processed messages.
* `init()`, prepare consumer for use (e. g. establish all necessary connections);
* `close()`, stop listening for messages and disconnect;
* `start()`, which invokes `init()`.

> **_NOTE:_** See [SqsPermissionConsumer.ts](./packages/sqs/test/consumers/SqsPermissionConsumer.ts) for a practical example.

##### How to define a handler
#### Handlers

You can define handlers for each of the supported messages in a type-safe way using the MessageHandlerConfigBuilder.

Expand All @@ -80,9 +85,10 @@ type ExecutionContext = {
userService: UserService
}

export class TestConsumerMultiSchema extends AbstractSqsConsumerMultiSchema<
export class SqsPermissionConsumer extends AbstractSqsConsumer<
SupportedMessages,
ExecutionContext
ExecutionContext,
PreHandlerContext
> {
constructor(
dependencies: SQSConsumerDependencies,
Expand All @@ -94,7 +100,8 @@ export class TestConsumerMultiSchema extends AbstractSqsConsumerMultiSchema<
//
handlers: new MessageHandlerConfigBuilder<
SupportedMessages,
ExecutionContext
ExecutionContext,
PreHandlerContext
>()
.addConfig(
PERMISSIONS_ADD_MESSAGE_SCHEMA,
Expand Down Expand Up @@ -149,7 +156,7 @@ Then the message is automatically nacked without requeueing by the abstract cons

> **_NOTE:_** See [userConsumerSchemas.ts](./packages/sqs/test/consumers/userConsumerSchemas.ts) and [SqsPermissionsConsumer.spec.ts](./packages/sqs/test/consumers/SqsPermissionsConsumer.spec.ts) for a practical example.
### Barrier pattern
### Barrier Pattern
The barrier pattern facilitates the out-of-order message handling by retrying the message later if the system is not yet in the proper state to be able to process that message (e. g. some prerequisite messages have not yet arrived).

To enable this pattern you should define `preHandlerBarrier` on your message handler in order to define the conditions for starting to process the message.
Expand Down Expand Up @@ -182,9 +189,9 @@ Both publishers and consumers accept a queue name and configuration as parameter

If you do not want to create a new queue/topic, you can set `queueLocator` field for `queueConfiguration`. In that case `message-queue-toolkit` will not attempt to create a new queue or topic, and instead throw an error if they don't already exist.

## Handler spies
## Handler Spies

In certain cases you want to await until certain publisher publishes a message, or a certain handler consumes a message. For that you can use handler spy functionality, built into message-queue-toolkit directly.
In certain cases you want to await until certain publisher publishes a message, or a certain handler consumes a message. For that you can use handler spy functionality, built into `message-queue-toolkit` directly.

In order to enable this functionality, configure spyHandler on the publisher or consumer:

Expand Down Expand Up @@ -235,51 +242,3 @@ const result = await myConsumer.handlerSpy.waitForMessageWithId('1')
expect(result.processingResult).toEqual('consumed')
```

## Automatic Reconnects (RabbitMQ)

`message-queue-toolkit` automatically reestablishes connections for all publishers and consumers via `AmqpConnectionManager` mechanism.

Example:

```ts
export const TEST_AMQP_CONFIG: AmqpConfig = {
vhost: '',
hostname: 'localhost',
username: 'guest',
password: 'guest',
port: 5672,
useTls: false,
}

const amqpConnectionManager = new AmqpConnectionManager(config, logger)
await amqpConnectionManager.init()

const publisher = new TestAmqpPublisher(
{ amqpConnectionManager },
{
/// other amqp options
})
await publisher.init()

const consumer = new TestAmqpConsumer(
{ amqpConnectionManager },
{
/// other amqp options
})
await consumer.start()

// break connection, to simulate unexpected disconnection in production
await (await amqpConnectionManager.getConnection()).close()

const message = {
// some test message
}

// This will fail, but will trigger reconnection within amqpConnectionManager
publisher.publish(message)

// eventually connection is reestablished and propagated across all the AMQP services that use same amqpConnectionManager

// This will succeed and consumer, which also received new connection, will be able to consume it
publisher.publish(message)
```
67 changes: 67 additions & 0 deletions packages/amqp/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# AMQP (Advanced Message Queuing Protocol)

The library provides support for both direct exchanges and topic exchanges.

> **_NOTE:_** Check [README.md](../../README.md) for transport-agnostic library documentation.
>
## Publishers

Use `AbstractAmqpQueuePublisher` to implement direct exchange and `AbstractAmqpTopicPublisher` for topic exchange.

See [test publisher](test/publishers/AmqpPermissionPublisher.ts) for an example of implementation.

## Consumers

Use `AbstractAmqpQueueConsumer` to implement direct exchange and `AbstractAmqpTopicConsumer` for topic exchange.

See [test consumer](test/consumers/AmqpPermissionConsumer.ts) for an example of implementation.

## Automatic Reconnects

`message-queue-toolkit` automatically reestablishes connections for all publishers and consumers via `AmqpConnectionManager` mechanism.

Example:

```ts
export const TEST_AMQP_CONFIG: AmqpConfig = {
vhost: '',
hostname: 'localhost',
username: 'guest',
password: 'guest',
port: 5672,
useTls: false,
}

const amqpConnectionManager = new AmqpConnectionManager(config, logger)
await amqpConnectionManager.init()

const publisher = new TestAmqpPublisher(
{ amqpConnectionManager },
{
// other amqp options
})
await publisher.init()

const consumer = new TestAmqpConsumer(
{ amqpConnectionManager },
{
// other amqp options
})
await consumer.start()

// break connection, to simulate unexpected disconnection in production
await (await amqpConnectionManager.getConnection()).close()

const message = {
// some test message
}

// This will fail, but will trigger reconnection within amqpConnectionManager
publisher.publish(message)

// eventually connection is reestablished and propagated across all the AMQP services that use same amqpConnectionManager

// This will succeed and consumer, which also received new connection, will be able to consume it
publisher.publish(message)
```

0 comments on commit 80a5912

Please sign in to comment.