Skip to content

Commit

Permalink
feat: manual reconnect
Browse files Browse the repository at this point in the history
Useful when e.g. a consumer is cancelled.
  • Loading branch information
luddd3 committed Aug 26, 2021
1 parent 1c47f6f commit 798b45f
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 0 deletions.
25 changes: 25 additions & 0 deletions src/AmqpConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,31 @@ export default class AmqpConnectionManager extends EventEmitter implements IAmqp
return !!this._currentConnection;
}

/** Force reconnect - noop unless connected */
reconnect(): void {
if (this._closed) {
throw new Error('cannot reconnect after close');
}

// If we have a connection, close it and immediately connect again.
// Wait for ordinary reconnect otherwise.
if (this._currentConnection) {
this._currentConnection.removeAllListeners();
this._currentConnection
.close()
.catch(() => {
// noop
})
.then(() => {
this._currentConnection = undefined;
this._disconnectSent = true;
this.emit('disconnect', { err: new Error('forced reconnect') });
return this._connect();
})
.catch(neverThrows);
}
}

/** The current connection. */
get connection(): Connection | undefined {
return this._currentConnection;
Expand Down
17 changes: 17 additions & 0 deletions test/AmqpConnectionManagerTest.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import origAmpq from 'amqplib';
import chai from 'chai';
import chaiString from 'chai-string';
import { once } from 'events';
import * as promiseTools from 'promise-tools';
import AmqpConnectionManager from '../src/AmqpConnectionManager';
import { FakeAmqp, FakeConnection } from './fixtures';
Expand Down Expand Up @@ -324,6 +325,22 @@ describe('AmqpConnectionManager', function () {
);
}));

it('should be able to manually reconnect', async () => {
amqp = new AmqpConnectionManager('amqp://localhost');
await once(amqp, 'connect');

amqp.reconnect();
await once(amqp, 'disconnect');
await once(amqp, 'connect');
});

it('should throw on manual reconnect after close', async () => {
amqp = new AmqpConnectionManager('amqp://localhost');
await once(amqp, 'connect');
await amqp.close()
expect(amqp.reconnect).to.throw()
})

it('should create and clean up channel wrappers', async function () {
amqp = new AmqpConnectionManager('amqp://localhost');
const channel = amqp.createChannel({ name: 'test-chan' });
Expand Down

0 comments on commit 798b45f

Please sign in to comment.