From 798b45f52c437f35a0f89b431b872354a7a3eb0e Mon Sep 17 00:00:00 2001 From: Mathias Lundell Date: Thu, 26 Aug 2021 09:34:47 +0200 Subject: [PATCH] feat: manual reconnect Useful when e.g. a consumer is cancelled. --- src/AmqpConnectionManager.ts | 25 +++++++++++++++++++++++++ test/AmqpConnectionManagerTest.ts | 17 +++++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/src/AmqpConnectionManager.ts b/src/AmqpConnectionManager.ts index 43500d6..e0eb6df 100644 --- a/src/AmqpConnectionManager.ts +++ b/src/AmqpConnectionManager.ts @@ -245,6 +245,31 @@ export default class AmqpConnectionManager extends EventEmitter implements IAmqp return !!this._currentConnection; } + /** Force reconnect - noop unless connected */ + reconnect(): void { + if (this._closed) { + throw new Error('cannot reconnect after close'); + } + + // If we have a connection, close it and immediately connect again. + // Wait for ordinary reconnect otherwise. + if (this._currentConnection) { + this._currentConnection.removeAllListeners(); + this._currentConnection + .close() + .catch(() => { + // noop + }) + .then(() => { + this._currentConnection = undefined; + this._disconnectSent = true; + this.emit('disconnect', { err: new Error('forced reconnect') }); + return this._connect(); + }) + .catch(neverThrows); + } + } + /** The current connection. */ get connection(): Connection | undefined { return this._currentConnection; diff --git a/test/AmqpConnectionManagerTest.ts b/test/AmqpConnectionManagerTest.ts index e297aa3..7e9ce61 100644 --- a/test/AmqpConnectionManagerTest.ts +++ b/test/AmqpConnectionManagerTest.ts @@ -1,6 +1,7 @@ import origAmpq from 'amqplib'; import chai from 'chai'; import chaiString from 'chai-string'; +import { once } from 'events'; import * as promiseTools from 'promise-tools'; import AmqpConnectionManager from '../src/AmqpConnectionManager'; import { FakeAmqp, FakeConnection } from './fixtures'; @@ -324,6 +325,22 @@ describe('AmqpConnectionManager', function () { ); })); + it('should be able to manually reconnect', async () => { + amqp = new AmqpConnectionManager('amqp://localhost'); + await once(amqp, 'connect'); + + amqp.reconnect(); + await once(amqp, 'disconnect'); + await once(amqp, 'connect'); + }); + + it('should throw on manual reconnect after close', async () => { + amqp = new AmqpConnectionManager('amqp://localhost'); + await once(amqp, 'connect'); + await amqp.close() + expect(amqp.reconnect).to.throw() + }) + it('should create and clean up channel wrappers', async function () { amqp = new AmqpConnectionManager('amqp://localhost'); const channel = amqp.createChannel({ name: 'test-chan' });