-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Handle RabbitMQ reconnect and message resending #12
Conversation
-Library manages connection retries and is basically a drop-in replacement for amqplib -Provides a setup function which can be used to specifiy desired behavior on connection reconnect (queue assertion, etc.)
- All direct references to a Channel become references to a persistent ChannelWrapper - Added ability to pass onConnect function in IAmqpCacoonConfig - onConnect is called each time a channel connection re-connects and can be used to assert queues and perform other setup tasks
- Cleaning up a bit how we add a setup function to the ChannelWrapper
-After replacing amqpLib with AmqpConnectionManager
- Assuming node-amqp-connection-library guarantees receipt of a published message, we shouldn't have to manually resend on drain - HOWEVER, it seems there is a bug in this library and it doesn't handle drain events. Creating a separate story for that!
-Removed it in code previously but not in test
Have more things to write, but updating references to amqp to refer to amqp-node-connection-manager
- Added test that can be run manually to ensure that published messages sent while network is down are received when network reconnects - Added simple callbacks to AmqpCacoon config so we can attach events to onConnect / onDisconnect events emitted by AmqpChannelManager
readme.md
Outdated
- Publish flow control included out of the box (Wait for drain event if we can't publish) | ||
- timeout if drain event does not occurs after some amount of time when channel is not ready to receive a publish | ||
- Simple interace around `node-amqp-manager` | ||
- ~Publish flow control included out of the box (Wait for drain event if we can't publish) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Quick typo fix: In GitHub, a you'll need double-tildes to strikethrough: ~~text to strikethrough~~
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, just the minor typo in the README.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Logically this appears really solid. I have some small comments in the code but nothing major.
During one of these comments ensure you using semantic-release syntax to ensure the version number gets incremented for the release process. See https://github.com/semantic-release/semantic-release for full details on the semantic-release commit message stuff. Essentially, one of your commit messages needs to have something like perf(feature_name): something about the features
and then semantic-release will automatically increment a major version number when creating the npm release package.
package.json
Outdated
@@ -16,6 +16,8 @@ | |||
"test": "mocha -r ts-node/register ./tests/**/*.test.ts" | |||
}, | |||
"dependencies": { | |||
"@types/amqp-connection-manager": "^2.0.10", | |||
"amqp-connection-manager": "^3.2.1", | |||
"amqplib": "^0.5.5", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove amqplib
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also I'm seeing rules-js as a dependancy. I'm sure that was my fault at some point but could you remove that while you are at it too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@daniellmorris I pulled out rules-js, but couldn't pull out amqplib completely because we still need access to some of the classes in there (and they're not exposed by amqp-connection-manager). We can discuss if ya want.
src/index.ts
Outdated
|
||
if (config.port) { | ||
fullHostNameString = fullHostNameString + ':' + config.port; | ||
private pubChannelWrapper: ChannelWrapper | null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like your tabwidth is set to 4 spaces instead of 2. Lets fix that for consistency :).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hehe, uh oh...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
src/index.ts
Outdated
// Connect if needed | ||
this.connection = | ||
this.connection || | ||
(await amqp.connect([this.fullHostName], this.amqp_opts)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting, it appears that amqp.connect doesn't return a promise. We should be able to remove the await here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you think I'm wrong then feel free to ignore this comment because it doesn't actually hurt the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, you're correct!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
src/index.ts
Outdated
logger?: Logger; | ||
}; | ||
onChannelConnect?: ConnectCallback; | ||
onBrokerConnect?: Function; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there are reason we are using Function
instead of something like the following?
onBrokerConnect?: () => void
// Or if you really need it to be any input and any output
onBrokerConnect?: (...args: any[]) => any
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh just probably my own JS / TS clumsiness!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
Fix strikeout
-About network reconnects / caching messages sent while AMQP is down
…tech-sd/amqp-cacoon into alex/IPB-139-network-retries
- It doesn't return a promise, no need to use await
-It's unused
🎉 This PR is included in version 2.0.0 🎉 The release is available on: Your semantic-release bot 📦🚀 |
Replaces the underlying amqplib library with node-amqp-connection-manager, which provides support for network reconnects and handles re-sending of any messages sent while the network is down.
Note that node-amqp-connection-manager currently has a reported bug and does not currently handle drain events. While it guarantees receipt of all published messages, it does not resend failed messages when a drain emit is fired. This would ideally be fixed in the library and allow us to resend on drain like we did previously (instead of on a fixed time interval). See jwalton/node-amqp-connection-manager#129
Closes #11