Skip to content

Commit

Permalink
fix: Emit connectFailed on connection failure.
Browse files Browse the repository at this point in the history
fix #222

BREAKING CHANGE: We will no longer emit a `disconnect` event on an
initial connection failure - instead we now emit `connectFailed` on each
connection failure, and only emit `disconnect` when we transition from
connected to disconnected.
  • Loading branch information
jwalton committed Jan 7, 2022
1 parent 8d181f9 commit 0f05987
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 27 deletions.
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,15 @@ Options:
- `options.reconnectTimeInSeconds` - The time to wait before trying to reconnect. If not specified,
defaults to `heartbeatIntervalInSeconds`.
- `options.findServers(callback)` is a function which returns one or more servers to connect to. This should
return either a single URL or an array of URLs. This is handy when you're using a service discovery mechanism
return either a single URL or an array of URLs. This is handy when you're using a service discovery mechanism.
such as Consul or etcd. Instead of taking a `callback`, this can also return a Promise. Note that if this
is supplied, then `urls` is ignored.
- `options.connectionOptions` is passed as options to the amqplib connect method.

### AmqpConnectionManager events

- `connect({connection, url})` - Emitted whenever we successfully connect to a broker.
- `connectFailed({err, url})` - Emitted whenever we attempt to connect to a broker, but fail.
- `disconnect({err})` - Emitted whenever we disconnect from a broker.
- `blocked({reason})` - Emitted whenever a connection is blocked by a broker
- `unblocked` - Emitted whenever a connection is unblocked by a broker
Expand Down Expand Up @@ -145,6 +146,10 @@ Returns true if the AmqpConnectionManager is connected to a broker, false otherw

Close this AmqpConnectionManager and free all associated resources.

### AmqpConnectionManager#connectionAttempts

This is the number of times we've tried to connect to a broker.

### ChannelWrapper events

- `connect` - emitted every time this channel connects or reconnects.
Expand Down
48 changes: 29 additions & 19 deletions src/AmqpConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ export interface ConnectListener {
(arg: { connection: Connection; url: string | amqp.Options.Connect }): void;
}

export interface ConnectFailedListener {
(arg: { err: Error; url: string | amqp.Options.Connect | undefined }): void;
}

export type AmpqConnectionOptions = (ConnectionOptions | TcpSocketConnectOpts) & {
noDelay?: boolean;
timeout?: number;
Expand Down Expand Up @@ -78,6 +82,7 @@ export interface IAmqpConnectionManager {

addListener(event: string, listener: (...args: any[]) => void): this;
addListener(event: 'connect', listener: ConnectListener): this;
addListener(event: 'connectFailed', listener: ConnectFailedListener): this;
addListener(event: 'blocked', listener: (arg: { reason: string }) => void): this;
addListener(event: 'unblocked', listener: () => void): this;
addListener(event: 'disconnect', listener: (arg: { err: Error }) => void): this;
Expand All @@ -87,24 +92,28 @@ export interface IAmqpConnectionManager {

on(event: string, listener: (...args: any[]) => void): this;
on(event: 'connect', listener: ConnectListener): this;
on(event: 'connectFailed', listener: ConnectFailedListener): this;
on(event: 'blocked', listener: (arg: { reason: string }) => void): this;
on(event: 'unblocked', listener: () => void): this;
on(event: 'disconnect', listener: (arg: { err: Error }) => void): this;

once(event: string, listener: (...args: any[]) => void): this;
once(event: 'connect', listener: ConnectListener): this;
once(event: 'connectFailed', listener: ConnectFailedListener): this;
once(event: 'blocked', listener: (arg: { reason: string }) => void): this;
once(event: 'unblocked', listener: () => void): this;
once(event: 'disconnect', listener: (arg: { err: Error }) => void): this;

prependListener(event: string, listener: (...args: any[]) => void): this;
prependListener(event: 'connect', listener: ConnectListener): this;
prependListener(event: 'connectFailed', listener: ConnectFailedListener): this;
prependListener(event: 'blocked', listener: (arg: { reason: string }) => void): this;
prependListener(event: 'unblocked', listener: () => void): this;
prependListener(event: 'disconnect', listener: (arg: { err: Error }) => void): this;

prependOnceListener(event: string, listener: (...args: any[]) => void): this;
prependOnceListener(event: 'connect', listener: ConnectListener): this;
prependOnceListener(event: 'connectFailed', listener: ConnectFailedListener): this;
prependOnceListener(event: 'blocked', listener: (arg: { reason: string }) => void): this;
prependOnceListener(event: 'unblocked', listener: () => void): this;
prependOnceListener(event: 'disconnect', listener: (arg: { err: Error }) => void): this;
Expand All @@ -127,6 +136,7 @@ export interface IAmqpConnectionManager {
//
// Events:
// * `connect({connection, url})` - Emitted whenever we connect to a broker.
// * `connectFailed({err, url})` - Emitted whenever we fail to connect to a broker.
// * `disconnect({err})` - Emitted whenever we disconnect from a broker.
// * `blocked({reason})` - Emitted whenever connection is blocked by a broker.
// * `unblocked()` - Emitted whenever connection is unblocked by a broker.
Expand All @@ -143,19 +153,20 @@ export default class AmqpConnectionManager extends EventEmitter implements IAmqp
| (() => Promise<ConnectionUrl | ConnectionUrl[]>);
private _urls?: ConnectionUrl[];

/**
* Keep track of whether a disconnect event has been sent or not. The problem
* is that if we've never connected, and we encounter an error, we want to
* generate a "disconnect" event, even though we're not disconnected, otherwise
* the caller will never know there was an error. So we can't just rely on
* this._currentConnection.
*/
private _disconnectSent = false;

public connectionOptions: AmpqConnectionOptions | undefined;
public heartbeatIntervalInSeconds: number;
public reconnectTimeInSeconds: number;

private _connectionAttempts = 0;

/**
* The number of connection attempts this connection manager has made,
* successful, failed, or in-progress..
*/
get connectionAttempts(): number {
return this._connectionAttempts;
}

/**
* Create a new AmqplibConnectionManager.
*
Expand Down Expand Up @@ -213,7 +224,7 @@ export default class AmqpConnectionManager extends EventEmitter implements IAmqp
this._connect();

let reject: (reason?: any) => void;
const onDisconnect = ({ err }: { err: any }) => {
const onConnectFailed = ({ err }: { err: any }) => {
// Ignore disconnects caused by dead servers etc., but throw on operational errors like bad credentials.
if (err.isOperational) {
reject(err);
Expand All @@ -225,7 +236,7 @@ export default class AmqpConnectionManager extends EventEmitter implements IAmqp
once(this, 'connect'),
new Promise((_resolve, innerReject) => {
reject = innerReject;
this.on('disconnect', onDisconnect);
this.on('connectFailed', onConnectFailed);
}),
...(timeout
? [
Expand All @@ -236,7 +247,7 @@ export default class AmqpConnectionManager extends EventEmitter implements IAmqp
: []),
]);
} finally {
this.removeListener('disconnect', onDisconnect);
this.removeListener('connectFailed', onConnectFailed);
}
}

Expand Down Expand Up @@ -302,7 +313,6 @@ export default class AmqpConnectionManager extends EventEmitter implements IAmqp
})
.then(() => {
this._currentConnection = undefined;
this._disconnectSent = true;
this.emit('disconnect', { err: new Error('forced reconnect') });
return this._connect();
})
Expand All @@ -329,6 +339,10 @@ export default class AmqpConnectionManager extends EventEmitter implements IAmqp
return Promise.resolve(null);
}

this._connectionAttempts++;

let attemptedUrl: string | amqp.Options.Connect | undefined;

const result = (this._connectPromise = Promise.resolve()
.then(() => {
if (!this._urls || this._currentUrl >= this._urls.length) {
Expand Down Expand Up @@ -372,6 +386,7 @@ export default class AmqpConnectionManager extends EventEmitter implements IAmqp
heartbeat: url.heartbeat ?? this.heartbeatIntervalInSeconds,
};
}
attemptedUrl = originalUrl;

// Add the `heartbeastIntervalInSeconds` to the connection options.
if (typeof connect === 'string') {
Expand Down Expand Up @@ -400,7 +415,6 @@ export default class AmqpConnectionManager extends EventEmitter implements IAmqp
// Reconnect if the connection closes
connection.on('close', (err) => {
this._currentConnection = undefined;
this._disconnectSent = true;
this.emit('disconnect', { err });

const handle = wait(this.reconnectTimeInSeconds * 1000);
Expand All @@ -413,18 +427,14 @@ export default class AmqpConnectionManager extends EventEmitter implements IAmqp
});

this._connectPromise = undefined;
this._disconnectSent = false;
this.emit('connect', { connection, url: originalUrl });

// Need to return null here, or Bluebird will complain - #171.
return null;
});
})
.catch((err) => {
if (!this._disconnectSent) {
this._disconnectSent = true;
this.emit('disconnect', { err });
}
this.emit('connectFailed', { err, url: attemptedUrl });

// Connection failed...
this._currentConnection = undefined;
Expand Down
16 changes: 9 additions & 7 deletions test/AmqpConnectionManagerTest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,9 @@ describe('AmqpConnectionManager', function () {
},
});
amqp.connect();
const [{ err }] = await once(amqp, 'disconnect');
const [{ err }] = await once(amqp, 'connectFailed');
expect(err.message).to.contain('No servers found');
expect(amqp.connectionAttempts).to.equal(1);
return amqp?.close();
});

Expand Down Expand Up @@ -189,22 +190,23 @@ describe('AmqpConnectionManager', function () {
it("should reconnect to the broker if it can't connect in the first place", async () => {
amqplib.deadServers = ['amqp://rabbit1'];

// Should try to connect to rabbit1 first and be refused, and then succesfully connect to rabbit2.
// Should try to connect to rabbit1 first and be refused, and then successfully connect to rabbit2.
amqp = new AmqpConnectionManager(['amqp://rabbit1', 'amqp://rabbit2'], {
heartbeatIntervalInSeconds: 0.01,
});
amqp.connect();

let disconnectEventsSeen = 0;
amqp.on('disconnect', function () {
disconnectEventsSeen++;
let connectFailedSeen = 0;
amqp.on('connectFailed', function () {
connectFailedSeen++;
amqplib.failConnections = false;
});

const [{ connection, url }] = await once(amqp, 'connect');
expect(disconnectEventsSeen).to.equal(1);
expect(connectFailedSeen).to.equal(1);
expect(amqp.connectionAttempts).to.equal(2);

// Verify that we round-robined to the next server, since the first was unavilable.
// Verify that we round-robined to the next server, since the first was unavailable.
expect(url, 'url').to.equal('amqp://rabbit2');
if (typeof url !== 'string') {
throw new Error('url is not a string');
Expand Down

0 comments on commit 0f05987

Please sign in to comment.