Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle RabbitMQ reconnect and message resending #12

Merged
merged 17 commits into from
Oct 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 32 additions & 30 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@
"test": "mocha -r ts-node/register ./tests/**/*.test.ts"
},
"dependencies": {
"amqplib": "^0.5.5",
"amqp-connection-manager": "^3.2.1",
"amqplib": "^0.6.0",
"lodash": "^4.17.15",
"log4js": "^6.1.0",
"rules-js": "^1.0.0"
"log4js": "^6.1.0"
},
"author": "Valtech: Daniel Morris",
"license": "MIT",
"devDependencies": {
"@types/amqp-connection-manager": "^2.0.10",
"@types/amqplib": "^0.5.13",
"@types/chai": "^4.2.7",
"@types/lodash": "^4.14.149",
Expand Down
18 changes: 12 additions & 6 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,16 @@

## Overview

This is a basic library to provide amqp support. This library is a wrapper around amqplib and makes amqp easier to work with.
This is a basic library to provide amqp support. Originally, this library was a wrapper around amqplib. It has since been updated to work with [node-amqp-connection-manager](https://github.com/jwalton/node-amqp-connection-manager), which provides support for behind-the-scenes retries on network failure. Node-amqp-connection-manager guarantees receipt of published messages and provides wrappers around potentially non-persistent channels.

## Features

- Simple interace around amqplib
- Publish flow control included out of the box (Wait for drain event if we can't publish)
- timeout if drain event does not occurs after some amount of time when channel is not ready to receive a publish
- Simple interace around `node-amqp-manager`
- ~~Publish flow control included out of the box (Wait for drain event if we can't publish)~~
- timeout if drain event does not occurs after some amount of time when channel is not ready to receive a publish~. As of 9/26, the publish on drain functionality has been removed, as `node-amqp-manager` does not support it at this time (pending a bugfix).
- Consume single or batch of messages
daniellmorris marked this conversation as resolved.
Show resolved Hide resolved
- Automatically handles reconnect if AMQP connection is lost and re-established
- Caches published messages if they are published while AMQP is disconnected

## Requirements to tests

Expand Down Expand Up @@ -147,11 +149,15 @@ amqpCacoon.registerConsumerBatch(
);
```

## Dealing With Channels
## Dealing With Channels via ChannelWrapper

This library expose amqplib channel when you call either `getConsumerChannel` or `getPublishChannel`. The channel is also exposed when registering a consumer. To learn more about that api see documentation for [amqplib](https://www.npmjs.com/package/amqplib). Just a couple thing that you should remember to do.
This library exposes node-amqp-connection-manager's ChannelWrapper when you call either `getConsumerChannel` or `getPublishChannel`. Instead of exposing the Amqp Channel directly (which may or may not be valid depending on the network status), AmqpConnectionManager provides a ChannelWrapper class as an interface to interacting with the underlying channel. Most functions that can be performed on an AmqpLib `Channel` can be performed on the `ChannelWrapper`, including `ackAll`, `nackAll`, etc. though they are Promise-based. See [AMQPConnectionManager's documentation](https://github.com/jwalton/node-amqp-connection-manager) for more info, as well as the underlying [amqplib docs](https://www.npmjs.com/package/amqplib).

Just a couple thing that you should remember to do.

1. Remember to ack or nack on all messages.
2. An alternative is to pass an option into the `registerConsumer` to not require an ack (noAck). The problem with this is that if your application is reset or errors out, you may loose the message or messages.


## Amqp-connection-manager Setup function
AmqpConnectionManager allows a setup function to be passed in its configuration, or added to a ChannelWrapper at any point. This function can be used with callbacks or Promises and direclty exposes the underlying AMQP channel (since we know it is valid at that point). The setup function is useful for asserting queues and performing other necessary tasks that must be completed once a valid connection to amqp is made. Again, see [AMQPConnectionManager's documentation](https://github.com/jwalton/node-amqp-connection-manager) for more details.
29 changes: 18 additions & 11 deletions src/helpers/message_batching_manager.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Channel, ConsumeMessage, ConsumeBatchMessages } from '../index';
import { Logger } from 'log4js';
import {ChannelWrapper, ConsumeMessage, ConsumeBatchMessages} from '../index';
import {Logger} from 'log4js';

export interface IMessageBatchingManagerConfig {
providers: {
Expand All @@ -14,8 +14,9 @@ export default class MessageBatchingManager {
private unackedMessageList: Array<ConsumeMessage> = [];
private bufferSize: number = 1;
private timerHandle?: NodeJS.Timeout;
private amqpChannel?: Channel;
private amqpChannel?: ChannelWrapper;
private logger?: Logger;

constructor(private config: IMessageBatchingManagerConfig) {
this.logger = config.providers.logger;

Expand Down Expand Up @@ -73,7 +74,7 @@ export default class MessageBatchingManager {
// 2. Reset message list
this.resetMessages();

return { bufferSize, unackedMessageList };
return {bufferSize, unackedMessageList};
}

/**
Expand All @@ -85,7 +86,7 @@ export default class MessageBatchingManager {
* @param channel: Channel - Channel
* @param messageList: Array<ConsumeMessage> - Messages to be acked
*/
ackMessageList(channel: Channel, messageList: Array<ConsumeMessage>) {
ackMessageList(channel: ChannelWrapper, messageList: Array<ConsumeMessage>) {
if (this.logger) {
this.logger.trace(`MessageBatchingManager.ackMessageList: Start`);
}
Expand All @@ -109,7 +110,7 @@ export default class MessageBatchingManager {
* @param messageList: Array<ConsumeMessage> - Messages to be nacked
*/
nackMessageList(
channel: Channel,
channel: ChannelWrapper,
messageList: Array<ConsumeMessage>,
requeue?: boolean
) {
Expand Down Expand Up @@ -138,14 +139,17 @@ export default class MessageBatchingManager {
* @returns void
*/
async sendBufferedMessages(
channel: Channel,
handler: (channel: Channel, msg: ConsumeBatchMessages) => Promise<void>
channel: ChannelWrapper,
handler: (
channel: ChannelWrapper,
msg: ConsumeBatchMessages
) => Promise<void>
) {
let unackedMessageList: Array<ConsumeMessage> = [];
let bufferSize: number;
try {
// 1. Finalize message buffer and fetch buffer and unackedMessageList
({ bufferSize, unackedMessageList } = this.finalizeMessages());
({bufferSize, unackedMessageList} = this.finalizeMessages());

// 2. Send messages to handler
let messages: ConsumeBatchMessages = {
Expand Down Expand Up @@ -190,9 +194,12 @@ export default class MessageBatchingManager {
* @returns void
*/
async handleMessageBuffering(
channel: Channel,
channel: ChannelWrapper,
msg: ConsumeMessage,
handler: (channel: Channel, msg: ConsumeBatchMessages) => Promise<void>
handler: (
channel: ChannelWrapper,
msg: ConsumeBatchMessages
) => Promise<void>
) {
// 1. Set channel variable for this object. This way we don't have to do an async call elswhere.
this.amqpChannel = channel;
Expand Down
Loading