Skip to content

Commit

Permalink
Implement automatic reconnection propagation (#38)
Browse files Browse the repository at this point in the history
  • Loading branch information
kibertoad authored Sep 28, 2023
1 parent 41ab8a3 commit 7de0fd0
Show file tree
Hide file tree
Showing 18 changed files with 494 additions and 130 deletions.
49 changes: 49 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,52 @@ SQS queues are built in a way that every message is only consumed once, and then
Both publishers and consumers accept a queue name and configuration as parameters. If the referenced queue (or SNS topic) does not exist at the moment the publisher or the consumer is instantiated, it is automatically created. Similarly, if the referenced topic does not exist during instantiation, it is also automatically created.

If you do not want to create a new queue/topic, you can set `queueLocator` field for `queueConfiguration`. In that case `message-queue-toolkit` will not attempt to create a new queue or topic, and instead throw an error if they don't already exist.

## Automatic Reconnects (RabbitMQ)

`message-queue-toolkit` automatically reestablishes connections for all publishers and consumers via `AmqpConnectionManager` mechanism.

Example:

```ts
export const TEST_AMQP_CONFIG: AmqpConfig = {
vhost: '',
hostname: 'localhost',
username: 'guest',
password: 'guest',
port: 5672,
useTls: false,
}

const amqpConnectionManager = new AmqpConnectionManager(config, logger)
await amqpConnectionManager.init()

const publisher = new TestAmqpPublisher(
{ amqpConnectionManager },
{
/// other amqp options
})
await publisher.init()

const consumer = new TestAmqpConsumer(
{ amqpConnectionManager },
{
/// other amqp options
})
await consumer.start()

// break connection, to simulate unexpected disconnection in production
await (await amqpConnectionManager.getConnection()).close()

const message = {
// some test message
}

// This will fail, but will trigger reconnection within amqpConnectionManager
publisher.publish(message)

// eventually connection is reestablished and propagated across all the AMQP services that use same amqpConnectionManager

// This will succeed and consumer, which also received new connection, will be able to consume it
publisher.publish(message)
```
6 changes: 4 additions & 2 deletions packages/amqp/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ export type { CommonMessage } from './lib/types/MessageTypes'

export type { AMQPQueueConfig } from './lib/AbstractAmqpService'

export { AbstractAmqpConsumer } from './lib/AbstractAmqpConsumer'
export { AbstractAmqpConsumerMonoSchema } from './lib/AbstractAmqpConsumerMonoSchema'
export { AbstractAmqpConsumerMultiSchema } from './lib/AbstractAmqpConsumerMultiSchema'
export { AmqpConsumerErrorResolver } from './lib/errors/AmqpConsumerErrorResolver'

export { AbstractAmqpPublisher } from './lib/AbstractAmqpPublisher'
export { AbstractAmqpPublisherMonoSchema } from './lib/AbstractAmqpPublisherMonoSchema'
export { AbstractAmqpPublisherMultiSchema } from './lib/AbstractAmqpPublisherMultiSchema'

export type { AmqpConfig } from './lib/amqpConnectionResolver'

export { resolveAmqpConnection } from './lib/amqpConnectionResolver'
export { AmqpConnectionManager } from './lib/AmqpConnectionManager'
export type { ConnectionReceiver } from './lib/AmqpConnectionManager'
export { deserializeAmqpMessage } from './lib/amqpMessageDeserializer'
21 changes: 15 additions & 6 deletions packages/amqp/lib/AbstractAmqpBaseConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import type {
ExistingQueueOptions,
} from '@message-queue-toolkit/core'
import { isMessageError, parseMessage } from '@message-queue-toolkit/core'
import type { Message } from 'amqplib'
import type { Connection, Message } from 'amqplib'

import type { AMQPConsumerDependencies, CreateAMQPQueueOptions } from './AbstractAmqpService'
import { AbstractAmqpService } from './AbstractAmqpService'
Expand Down Expand Up @@ -108,12 +108,12 @@ export abstract class AbstractAmqpBaseConsumer<MessagePayloadType extends object
}
}

async start() {
await this.init()
if (!this.channel) {
throw new Error('Channel is not set')
}
async receiveNewConnection(connection: Connection): Promise<void> {
await super.receiveNewConnection(connection)
await this.consume()
}

private async consume() {
await this.channel.consume(this.queueName, (message) => {
if (message === null) {
return
Expand Down Expand Up @@ -158,6 +158,15 @@ export abstract class AbstractAmqpBaseConsumer<MessagePayloadType extends object
})
}

async start() {
await this.init()
if (!this.channel) {
throw new Error('Channel is not set')
}

await this.consume()
}

protected resolveMessage(message: Message) {
return readAmqpMessage(message, this.errorResolver)
}
Expand Down
32 changes: 32 additions & 0 deletions packages/amqp/lib/AbstractAmqpBasePublisher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import type { Either } from '@lokalise/node-core'
import type { MessageInvalidFormatError, MessageValidationError } from '@message-queue-toolkit/core'
import { objectToBuffer } from '@message-queue-toolkit/core'

import { AbstractAmqpService } from './AbstractAmqpService'

export abstract class AbstractAmqpBasePublisher<
MessagePayloadType extends object,
> extends AbstractAmqpService<MessagePayloadType> {
protected sendToQueue(message: MessagePayloadType): void {
try {
this.channel.sendToQueue(this.queueName, objectToBuffer(message))
} catch (err) {
// Unfortunately, reliable retry mechanism can't be implemented with try-catch block,
// as not all failures end up here. If connection is closed programmatically, it works fine,
// but if server closes connection unexpectedly (e. g. RabbitMQ is shut down), then we don't land here
// @ts-ignore
if (err.message === 'Channel closed') {
this.logger.error(`AMQP channel closed`)
void this.reconnect()
} else {
throw err
}
}
}

/* c8 ignore start */
protected resolveMessage(): Either<MessageInvalidFormatError | MessageValidationError, unknown> {
throw new Error('Not implemented for publisher')
}
/* c8 ignore stop */
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import type {
import { AbstractAmqpBaseConsumer } from './AbstractAmqpBaseConsumer'
import type { AMQPConsumerDependencies } from './AbstractAmqpService'

export abstract class AbstractAmqpConsumer<MessagePayloadType extends object>
export abstract class AbstractAmqpConsumerMonoSchema<MessagePayloadType extends object>
extends AbstractAmqpBaseConsumer<MessagePayloadType>
implements QueueConsumer
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
import type { Either } from '@lokalise/node-core'
import type {
ExistingQueueOptions,
MessageInvalidFormatError,
MessageValidationError,
MonoSchemaQueueOptions,
NewQueueOptions,
SyncPublisher,
} from '@message-queue-toolkit/core'
import { objectToBuffer } from '@message-queue-toolkit/core'
import type { ZodSchema } from 'zod'

import type { AMQPLocatorType } from './AbstractAmqpBaseConsumer'
import { AbstractAmqpService } from './AbstractAmqpService'
import { AbstractAmqpBasePublisher } from './AbstractAmqpBasePublisher'
import type { AMQPDependencies, CreateAMQPQueueOptions } from './AbstractAmqpService'

export abstract class AbstractAmqpPublisher<MessagePayloadType extends object>
extends AbstractAmqpService<MessagePayloadType>
export abstract class AbstractAmqpPublisherMonoSchema<MessagePayloadType extends object>
extends AbstractAmqpBasePublisher<MessagePayloadType>
implements SyncPublisher<MessagePayloadType>
{
private readonly messageSchema: ZodSchema<MessagePayloadType>
Expand All @@ -39,17 +36,12 @@ export abstract class AbstractAmqpPublisher<MessagePayloadType extends object>
this.logMessage(resolvedLogMessage)
}

this.channel.sendToQueue(this.queueName, objectToBuffer(message))
this.sendToQueue(message)
}

/* c8 ignore start */
protected resolveMessage(): Either<MessageInvalidFormatError | MessageValidationError, unknown> {
throw new Error('Not implemented for publisher')
}

protected override resolveSchema(): Either<Error, ZodSchema<MessagePayloadType>> {
throw new Error('Not implemented for publisher')
}

/* c8 ignore stop */
}
17 changes: 4 additions & 13 deletions packages/amqp/lib/AbstractAmqpPublisherMultiSchema.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
import type { Either } from '@lokalise/node-core'
import type {
ExistingQueueOptions,
MessageInvalidFormatError,
MessageValidationError,
NewQueueOptions,
SyncPublisher,
MultiSchemaPublisherOptions,
} from '@message-queue-toolkit/core'
import { MessageSchemaContainer, objectToBuffer } from '@message-queue-toolkit/core'
import { MessageSchemaContainer } from '@message-queue-toolkit/core'
import type { ZodSchema } from 'zod'

import type { AMQPLocatorType } from './AbstractAmqpBaseConsumer'
import { AbstractAmqpService } from './AbstractAmqpService'
import { AbstractAmqpBasePublisher } from './AbstractAmqpBasePublisher'
import type { AMQPDependencies, CreateAMQPQueueOptions } from './AbstractAmqpService'

export abstract class AbstractAmqpPublisherMultiSchema<MessagePayloadType extends object>
extends AbstractAmqpService<MessagePayloadType>
extends AbstractAmqpBasePublisher<MessagePayloadType>
implements SyncPublisher<MessagePayloadType>
{
private readonly messageSchemaContainer: MessageSchemaContainer<MessagePayloadType>
Expand Down Expand Up @@ -47,16 +45,9 @@ export abstract class AbstractAmqpPublisherMultiSchema<MessagePayloadType extend
this.logMessage(resolvedLogMessage)
}

this.channel.sendToQueue(this.queueName, objectToBuffer(message))
this.sendToQueue(message)
}

/* c8 ignore start */
protected resolveMessage(): Either<MessageInvalidFormatError | MessageValidationError, unknown> {
throw new Error('Not implemented for publisher')
}

/* c8 ignore stop */

protected override resolveSchema(
message: MessagePayloadType,
): Either<Error, ZodSchema<MessagePayloadType>> {
Expand Down
Loading

0 comments on commit 7de0fd0

Please sign in to comment.