Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…lkit into chore/update-docker
  • Loading branch information
kibertoad committed Sep 28, 2023
2 parents c46e1dc + 41ab8a3 commit 1424f58
Show file tree
Hide file tree
Showing 65 changed files with 1,690 additions and 307 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
name: ci

on:
push:
branches:
- main
pull_request:

jobs:
Expand Down
131 changes: 123 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
# message-queue-toolkit ✉️

[![NPM Version](https://img.shields.io/npm/v/@message-queue-toolkit/core.svg)](https://www.npmjs.com/package/@message-queue-toolkit/core)
[![NPM Downloads](https://img.shields.io/npm/dm/@message-queue-toolkit/core.svg)](https://npmjs.org/package/@message-queue-toolkit/core)
[![Build Status](https://github.com/kibertoad/message-queue-toolkit/workflows/ci/badge.svg)](https://github.com/kibertoad/message-queue-toolkit/actions)

Useful utilities, interfaces and base classes for message queue handling.

## Overview
Expand All @@ -16,26 +21,52 @@ It consists of the following submodules:

### Publishers

`message-queue-toolkit` provides base classes for implementing publishers for each of the supported protocol. They implement the following public methods:
`message-queue-toolkit` provides base classes for implementing publishers for each of the supported protocol.

#### Mono-schema publishers

Mono-schema publishers only support a single message type and are simpler to implement. They expose the following public methods:

* `constructor()`, which accepts the following parameters:
* `dependencies` – a set of dependencies depending on the protocol;
* `options`, composed by
* `messageSchema` – the `zod` schema for the message;
* `messageTypeField`;
* `messageTypeField` - which field in the message describes the type of a message. This field needs to be defined as `z.literal` in the schema;
* `locatorConfig` - configuration for resolving existing queue and/or topic. Should not be specified together with the `creationConfig`.
* `creationConfig` - configuration for queue and/or topic to create, if one does not exist. Should not be specified together with the `locatorConfig`.
* `init()`, which needs to be invoked before the publisher can be used;
* `close()`, which needs to be invoked when stopping the application;
* `publish()`, which accepts the following parameters:
* `init()`, prepare publisher for use (e. g. establish all necessary connections);
* `close()`, stop publisher use (e. g. disconnect);
* `publish()`, send a message to a queue or topic. It accepts the following parameters:
* `message` – a message following a `zod` schema;
* `options` – a protocol-dependent set of message parameters. For more information please check documentation for options for each protocol: [AMQP](https://amqp-node.github.io/amqplib/channel_api.html#channel_sendToQueue), [SQS](https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/clients/client-sqs/interfaces/sendmessagecommandinput.html) and [SNS](https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/clients/client-sns/interfaces/publishcommandinput.html).

> **_NOTE:_** See [SqsPermissionPublisherMonoSchema.ts](./packages/sqs/test/publishers/SqsPermissionPublisherMonoSchema.ts) for a practical example.
#### Multi-schema publishers

Multi-schema publishers support multiple messages types. They implement the following public methods:

* `constructor()`, which accepts the following parameters:
* `dependencies` – a set of dependencies depending on the protocol;
* `options`, composed by
* `messageSchemas` – the `zod` schemas for all supported messages;
* `messageTypeField` - which field in the message describes the type of a message. This field needs to be defined as `z.literal` in the schema and is used for resolving the correct schema for validation
* `locatorConfig` - configuration for resolving existing queue and/or topic. Should not be specified together with the `creationConfig`.
* `creationConfig` - configuration for queue and/or topic to create, if one does not exist. Should not be specified together with the `locatorConfig`.
* `init()`, prepare publisher for use (e. g. establish all necessary connections);
* `close()`, stop publisher use (e. g. disconnect);
* `publish()`, send a message to a queue or topic. It accepts the following parameters:
* `message` – a message following one of the `zod` schemas, supported by the publisher;
* `options` – a protocol-dependent set of message parameters. For more information please check documentation for options for each protocol: [AMQP](https://amqp-node.github.io/amqplib/channel_api.html#channel_sendToQueue), [SQS](https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/clients/client-sqs/interfaces/sendmessagecommandinput.html) and [SNS](https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/clients/client-sns/interfaces/publishcommandinput.html).


### Consumers

`message-queue-toolkit` provides base classes for implementing consumers for each of the supported protocol. They implement the following public methods:
`message-queue-toolkit` provides base classes for implementing consumers for each of the supported protocol.

#### Mono-schema consumers

Mono-schema consumers only support a single message type and are simpler to implement. They expose the following public methods:

* `constructor()`, which accepts the following parameters:
* `dependencies` – a set of dependencies depending on the protocol;
Expand All @@ -48,13 +79,87 @@ It consists of the following submodules:
* `subscriptionConfig` - SNS SQS consumer only - configuration for SNS -> SQS subscription to create, if one doesn't exist.
* `consumerOverrides` – available only for SQS consumers;
* `subscribedToTopic` – parameters for a topic to use during creation if it does not exist. Ignored if `queueLocator.subscriptionArn` is set. Available only for SNS consumers;
* `init()`, which needs to be invoked before the consumer can be used;
* `close()`, which needs to be invoked when stopping the application;
* `init()`, prepare consumer for use (e. g. establish all necessary connections);
* `close()`, stop listening for messages and disconnect;
* `processMessage()`, which accepts as parameter a `message` following a `zod` schema and should be overridden with logic on what to do with the message;
* `start()`, which invokes `init()` and `processMessage()` and handles errors.
* `preHandlerBarrier`, which accepts as a parameter a `message` following a `zod` schema and can be overridden to enable the barrier pattern (see [Barrier pattern](#barrier-pattern))

> **_NOTE:_** See [SqsPermissionConsumerMonoSchema.ts](./packages/sqs/test/consumers/SqsPermissionConsumerMonoSchema.ts) for a practical example.
#### Multi-schema consumers

Multi-schema consumers support multiple message types via handler configs. They expose the following public methods:

* `constructor()`, which accepts the following parameters:
* `dependencies` – a set of dependencies depending on the protocol;
* `options`, composed by
* `handlers` – configuration for handling each of the supported message types. See "Multi-schema handler definition" for more details;
* `messageTypeField` - which field in the message describes the type of a message. This field needs to be defined as `z.literal` in the schema and is used for routing the message to the correct handler;
* `queueName`; (for SNS publishers this is a misnomer which actually refers to a topic name)
* `locatorConfig` - configuration for resolving existing queue and/or topic. Should not be specified together with the `creationConfig`.
* `creationConfig` - configuration for queue and/or topic to create, if one does not exist. Should not be specified together with the `locatorConfig`.
* `subscriptionConfig` - SNS SQS consumer only - configuration for SNS -> SQS subscription to create, if one doesn't exist.
* `consumerOverrides` – available only for SQS consumers;
* `subscribedToTopic` – parameters for a topic to use during creation if it does not exist. Ignored if `queueLocator.subscriptionArn` is set. Available only for SNS consumers;
* `init()`, prepare consumer for use (e. g. establish all necessary connections);
* `close()`, stop listening for messages and disconnect;

* `processMessage()`, which accepts as parameter a `message` following a `zod` schema and should be overridden with logic on what to do with the message;
* `start()`, which invokes `init()` and `processMessage()` and handles errors.

##### Multi-schema handler definition

You can define handlers for each of the supported messages in a type-safe way using the MessageHandlerConfigBuilder.

Here is an example:

```ts
type SupportedMessages = PERMISSIONS_ADD_MESSAGE_TYPE | PERMISSIONS_REMOVE_MESSAGE_TYPE

export class TestConsumerMultiSchema extends AbstractSqsConsumerMultiSchema<
SupportedMessages,
TestConsumerMultiSchema
> {
constructor(
dependencies: SQSConsumerDependencies,
) {
super(dependencies, {
//
// rest of configuration skipped
//
handlers: new MessageHandlerConfigBuilder<
SupportedMessages,
SqsPermissionConsumerMultiSchema
>()
.addConfig(
PERMISSIONS_ADD_MESSAGE_SCHEMA,
async (message, context) => {
// process message
return {
result: 'success',
}
},
{
preHandlerBarrier: async (message) => {
// do barrier check here
return true
}
},
)
.addConfig(PERMISSIONS_REMOVE_MESSAGE_SCHEMA,
async (message, context) => {
// process message
return {
result: 'success',
}
})
.build(),
})
}
}
```

#### Error Handling

When implementing message handler in consumer (by overriding the `processMessage()` method), you are expected to return an instance of `Either`, containing either an error `retryLater`, or result `success`. In case of `retryLater`, the abstract consumer is instructed to requeue the message. Otherwise, in case of success, the message is finally removed from the queue. If an error is thrown while processing the message, the abstract consumer will also requeue the message. When overriding the `processMessage()` method, you should leverage the possible types to process the message as you need.
Expand All @@ -72,6 +177,16 @@ Then the message is automatically nacked without requeueing by the abstract cons

> **_NOTE:_** See [userConsumerSchemas.ts](./packages/sqs/test/consumers/userConsumerSchemas.ts) and [SqsPermissionsConsumerMonoSchema.spec.ts](./packages/sqs/test/consumers/SqsPermissionsConsumerMonoSchema.spec.ts) for a practical example.
### Barrier pattern
The barrier pattern facilitates the out-of-order message handling by retrying the message later if the system is not yet in the proper state to be able to process that message (e. g. some prerequisite messages have not yet arrived).

To enable this pattern you should implement `preHandlerBarrier` in order to define the conditions for starting to process the message.
If the barrier method returns `false`, message will be returned into the queue for the later processing. If the barrier method returns `true`, message will be processed.

> **_NOTE:_** See [SqsPermissionConsumerMonoSchema.ts](./packages/sns/test/consumers/SnsSqsPermissionConsumerMonoSchema.ts) for a practical example on mono consumers.
> **_NOTE:_** See [SqsPermissionConsumerMultiSchema.ts](./packages/sns/test/consumers/SnsSqsPermissionConsumerMultiSchema.ts) for a practical example on multi consumers.

## Fan-out to Multiple Consumers

SQS queues are built in a way that every message is only consumed once, and then deleted. If you want to do fan-out to multiple consumers, you need SNS topic in the middle, which is then propagated to all the SQS queues that have subscribed.
Expand Down
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
"name": "@message-queue-toolkit/parent",
"version": "1.0.0",
"dependencies": {
"@lokalise/node-core": "^5.8.1"
},
"scripts": {
"build": "npm run build --workspaces",
Expand Down
3 changes: 3 additions & 0 deletions packages/amqp/.eslintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
"@typescript-eslint/no-var-requires": "off",
"@typescript-eslint/indent": "off",
"@typescript-eslint/no-explicit-any": "warn",
"@typescript-eslint/restrict-template-expressions": "off",
"@typescript-eslint/no-unsafe-assignment": "off",
"@typescript-eslint/no-unsafe-argument": "off",
"@typescript-eslint/no-unsafe-member-access": "warn",
"@typescript-eslint/explicit-function-return-type": "off",
"@typescript-eslint/consistent-type-imports": "warn",
Expand Down
2 changes: 2 additions & 0 deletions packages/amqp/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ export type { CommonMessage } from './lib/types/MessageTypes'
export type { AMQPQueueConfig } from './lib/AbstractAmqpService'

export { AbstractAmqpConsumer } from './lib/AbstractAmqpConsumer'
export { AbstractAmqpConsumerMultiSchema } from './lib/AbstractAmqpConsumerMultiSchema'
export { AmqpConsumerErrorResolver } from './lib/errors/AmqpConsumerErrorResolver'

export { AbstractAmqpPublisher } from './lib/AbstractAmqpPublisher'
export { AbstractAmqpPublisherMultiSchema } from './lib/AbstractAmqpPublisherMultiSchema'

export type { AmqpConfig } from './lib/amqpConnectionResolver'

Expand Down
164 changes: 164 additions & 0 deletions packages/amqp/lib/AbstractAmqpBaseConsumer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
import type { Either, ErrorResolver } from '@lokalise/node-core'
import type {
QueueConsumer,
NewQueueOptions,
TransactionObservabilityManager,
ExistingQueueOptions,
} from '@message-queue-toolkit/core'
import { isMessageError, parseMessage } from '@message-queue-toolkit/core'
import type { Message } from 'amqplib'

import type { AMQPConsumerDependencies, CreateAMQPQueueOptions } from './AbstractAmqpService'
import { AbstractAmqpService } from './AbstractAmqpService'
import { readAmqpMessage } from './amqpMessageReader'

const ABORT_EARLY_EITHER: Either<'abort', never> = {
error: 'abort',
}

export type AMQPLocatorType = { queueName: string }

export type NewAMQPConsumerOptions = NewQueueOptions<CreateAMQPQueueOptions>

export type ExistingAMQPConsumerOptions = ExistingQueueOptions<AMQPLocatorType>

export abstract class AbstractAmqpBaseConsumer<MessagePayloadType extends object>
extends AbstractAmqpService<MessagePayloadType, AMQPConsumerDependencies>
implements QueueConsumer
{
private readonly transactionObservabilityManager?: TransactionObservabilityManager
protected readonly errorResolver: ErrorResolver

constructor(
dependencies: AMQPConsumerDependencies,
options: NewAMQPConsumerOptions | ExistingAMQPConsumerOptions,
) {
super(dependencies, options)
this.transactionObservabilityManager = dependencies.transactionObservabilityManager
this.errorResolver = dependencies.consumerErrorResolver

if (!options.locatorConfig?.queueName && !options.creationConfig?.queueName) {
throw new Error('queueName must be set in either locatorConfig or creationConfig')
}
}

private async internalProcessMessage(
message: MessagePayloadType,
messageType: string,
): Promise<Either<'retryLater', 'success'>> {
const barrierPassed = await this.preHandlerBarrier(message, messageType)

if (barrierPassed) {
return this.processMessage(message, messageType)
}
return { error: 'retryLater' }
}

protected abstract preHandlerBarrier(
message: MessagePayloadType,
messageType: string,
): Promise<boolean>

abstract processMessage(
message: MessagePayloadType,
messageType: string,
): Promise<Either<'retryLater', 'success'>>

private deserializeMessage(message: Message | null): Either<'abort', MessagePayloadType> {
if (message === null) {
return ABORT_EARLY_EITHER
}

const resolveMessageResult = this.resolveMessage(message)
if (isMessageError(resolveMessageResult.error)) {
this.handleError(resolveMessageResult.error)
return ABORT_EARLY_EITHER
}

// Empty content for whatever reason
if (!resolveMessageResult.result) {
return ABORT_EARLY_EITHER
}

const resolveSchemaResult = this.resolveSchema(
resolveMessageResult.result as MessagePayloadType,
)
if (resolveSchemaResult.error) {
this.handleError(resolveSchemaResult.error)
return ABORT_EARLY_EITHER
}

const deserializationResult = parseMessage(
resolveMessageResult.result,
resolveSchemaResult.result,
this.errorResolver,
)
if (isMessageError(deserializationResult.error)) {
this.handleError(deserializationResult.error)
return ABORT_EARLY_EITHER
}

// Empty content for whatever reason
if (!deserializationResult.result) {
return ABORT_EARLY_EITHER
}

return {
result: deserializationResult.result,
}
}

async start() {
await this.init()
if (!this.channel) {
throw new Error('Channel is not set')
}

await this.channel.consume(this.queueName, (message) => {
if (message === null) {
return
}

const deserializedMessage = this.deserializeMessage(message)
if (deserializedMessage.error === 'abort') {
this.channel.nack(message, false, false)
return
}
// @ts-ignore
const messageType = deserializedMessage.result[this.messageTypeField]
const transactionSpanId = `queue_${this.queueName}:${
// @ts-ignore
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
deserializedMessage.result[this.messageTypeField]
}`

this.transactionObservabilityManager?.start(transactionSpanId)
if (this.logMessages) {
const resolvedLogMessage = this.resolveMessageLog(deserializedMessage.result, messageType)
this.logMessage(resolvedLogMessage)
}
this.internalProcessMessage(deserializedMessage.result, messageType)
.then((result) => {
if (result.error === 'retryLater') {
this.channel.nack(message, false, true)
}
if (result.result === 'success') {
this.channel.ack(message)
}
})
.catch((err) => {
// ToDo we need sanity check to stop trying at some point, perhaps some kind of Redis counter
// If we fail due to unknown reason, let's retry
this.channel.nack(message, false, true)
this.handleError(err)
})
.finally(() => {
this.transactionObservabilityManager?.stop(transactionSpanId)
})
})
}

protected resolveMessage(message: Message) {
return readAmqpMessage(message, this.errorResolver)
}
}
Loading

0 comments on commit 1424f58

Please sign in to comment.