From 2e5eb7bce80023c1d2538235bdec36774ab47454 Mon Sep 17 00:00:00 2001 From: Andres Kalle Date: Wed, 31 Mar 2021 13:14:15 +0300 Subject: [PATCH 1/2] Detect failover from +switch-master messages --- .../SentinelConnector/FailoverDetector.ts | 73 ++++ lib/connectors/SentinelConnector/index.ts | 138 +++++++- lib/connectors/SentinelConnector/types.ts | 28 ++ lib/redis/RedisOptions.ts | 9 + lib/redis/index.ts | 5 +- test/functional/sentinel.ts | 326 ++++++++++++++++-- test/helpers/mock_server.ts | 4 + test/helpers/once.ts | 28 ++ 8 files changed, 574 insertions(+), 37 deletions(-) create mode 100644 lib/connectors/SentinelConnector/FailoverDetector.ts create mode 100644 test/helpers/once.ts diff --git a/lib/connectors/SentinelConnector/FailoverDetector.ts b/lib/connectors/SentinelConnector/FailoverDetector.ts new file mode 100644 index 00000000..a08a114f --- /dev/null +++ b/lib/connectors/SentinelConnector/FailoverDetector.ts @@ -0,0 +1,73 @@ +import { Debug } from "../../utils"; +import SentinelConnector from "./index"; +import { ISentinel } from "./types"; + +const debug = Debug("FailoverDetector"); + +const CHANNEL_NAME = "+switch-master"; + +export class FailoverDetector { + private connector: SentinelConnector; + private sentinels: ISentinel[]; + private isDisconnected = false; + + // sentinels can't be used for regular commands after this + constructor(connector: SentinelConnector, sentinels: ISentinel[]) { + this.connector = connector; + this.sentinels = sentinels; + } + + public cleanup() { + this.isDisconnected = true; + + for (const sentinel of this.sentinels) { + if (sentinel.isConnected) { + sentinel.getClient().disconnect(); + } + } + } + + public getClients() { + return this.sentinels.map((sentinel) => sentinel.getClient()); + } + + public async subscribe() { + debug("Starting FailoverDetector"); + + const promises: Promise[] = []; + + for (const sentinel of this.sentinels) { + const client = sentinel.getClient(); + + const promise = client.subscribe(CHANNEL_NAME).catch((err) => { + debug( + "Failed to subscribe to failover messages on sentinel %s:%s (%s)", + sentinel.address.host || "127.0.0.1", + sentinel.address.port || 26739, + err.message + ); + }); + + promises.push(promise); + + client.on("message", (channel: string) => { + if (!this.isDisconnected && channel === CHANNEL_NAME) { + this.disconnect(); + } + }); + } + + await Promise.all(promises); + } + + private disconnect() { + // Avoid disconnecting more than once per failover. + // A new FailoverDetector will be created after reconnecting. + this.isDisconnected = true; + + debug("Failover detected, disconnecting"); + + // Will call this.cleanup() + this.connector.disconnect(); + } +} diff --git a/lib/connectors/SentinelConnector/index.ts b/lib/connectors/SentinelConnector/index.ts index 26d24bf0..b2e51ae5 100644 --- a/lib/connectors/SentinelConnector/index.ts +++ b/lib/connectors/SentinelConnector/index.ts @@ -1,3 +1,4 @@ +import { EventEmitter } from "events"; import { createConnection } from "net"; import { INatMap } from "../../cluster/ClusterOptions"; import { @@ -12,10 +13,11 @@ import { isIIpcConnectionOptions, } from "../StandaloneConnector"; import SentinelIterator from "./SentinelIterator"; -import { ISentinelAddress } from "./types"; +import { IRedisClient, ISentinelAddress, ISentinel } from "./types"; import AbstractConnector, { ErrorEmitter } from "../AbstractConnector"; import { NetStream } from "../../types"; import Redis from "../../redis"; +import { FailoverDetector } from "./FailoverDetector"; const debug = Debug("SentinelConnector"); @@ -39,6 +41,7 @@ export interface ISentinelConnectionOptions extends ITcpConnectionOptions { sentinelPassword?: string; sentinels: Array>; sentinelRetryStrategy?: (retryAttempts: number) => number | void | null; + sentinelReconnectStrategy?: (retryAttempts: number) => number | void | null; preferredSlaves?: PreferredSlaves; connectTimeout?: number; disconnectTimeout?: number; @@ -47,11 +50,14 @@ export interface ISentinelConnectionOptions extends ITcpConnectionOptions { sentinelTLS?: ConnectionOptions; natMap?: INatMap; updateSentinels?: boolean; + sentinelMaxConnections?: number; } export default class SentinelConnector extends AbstractConnector { private retryAttempts: number; + private failoverDetector: FailoverDetector | null = null; protected sentinelIterator: SentinelIterator; + public emitter: EventEmitter | null = null; constructor(protected options: ISentinelConnectionOptions) { super(options.disconnectTimeout); @@ -84,6 +90,14 @@ export default class SentinelConnector extends AbstractConnector { return roleMatches; } + public disconnect(): void { + super.disconnect(); + + if (this.failoverDetector) { + this.failoverDetector.cleanup(); + } + } + public connect(eventEmitter: ErrorEmitter): Promise { this.connecting = true; this.retryAttempts = 0; @@ -134,8 +148,15 @@ export default class SentinelConnector extends AbstractConnector { throw new Error(CONNECTION_CLOSED_ERROR_MSG); } + const endpointAddress = endpoint.value.host + ":" + endpoint.value.port; + if (resolved) { - debug("resolved: %s:%s", resolved.host, resolved.port); + debug( + "resolved: %s:%s from sentinel %s", + resolved.host, + resolved.port, + endpointAddress + ); if (this.options.enableTLSForSentinelMode && this.options.tls) { Object.assign(resolved, this.options.tls); this.stream = createTLSConnection(resolved); @@ -143,14 +164,14 @@ export default class SentinelConnector extends AbstractConnector { this.stream = createConnection(resolved); } + this.stream.once("connect", () => this.activateFailoverDetector()); + this.stream.once("error", (err) => { this.firstError = err; }); - this.sentinelIterator.reset(true); return this.stream; } else { - const endpointAddress = endpoint.value.host + ":" + endpoint.value.port; const errorMsg = err ? "failed to connect to sentinel " + endpointAddress + @@ -176,7 +197,7 @@ export default class SentinelConnector extends AbstractConnector { return connectToNext(); } - private async updateSentinels(client): Promise { + private async updateSentinels(client: IRedisClient): Promise { if (!this.options.updateSentinels) { return; } @@ -209,7 +230,9 @@ export default class SentinelConnector extends AbstractConnector { debug("Updated internal sentinels: %s", this.sentinelIterator); } - private async resolveMaster(client): Promise { + private async resolveMaster( + client: IRedisClient + ): Promise { const result = await client.sentinel( "get-master-addr-by-name", this.options.name @@ -224,7 +247,9 @@ export default class SentinelConnector extends AbstractConnector { ); } - private async resolveSlave(client): Promise { + private async resolveSlave( + client: IRedisClient + ): Promise { const result = await client.sentinel("slaves", this.options.name); if (!Array.isArray(result)) { @@ -251,8 +276,8 @@ export default class SentinelConnector extends AbstractConnector { return this.options.natMap[`${item.host}:${item.port}`] || item; } - private async resolve(endpoint): Promise { - const client = new Redis({ + private connectToSentinel(endpoint: Partial): IRedisClient { + return new Redis({ port: endpoint.port || 26379, host: endpoint.host, username: this.options.sentinelUsername || null, @@ -269,19 +294,108 @@ export default class SentinelConnector extends AbstractConnector { commandTimeout: this.options.sentinelCommandTimeout, dropBufferSupport: true, }); + } + + private async resolve( + endpoint: Partial + ): Promise { + const client = this.connectToSentinel(endpoint); // ignore the errors since resolve* methods will handle them client.on("error", noop); + let result: ITcpConnectionOptions | null = null; + try { if (this.options.role === "slave") { - return await this.resolveSlave(client); + result = await this.resolveSlave(client); } else { - return await this.resolveMaster(client); + result = await this.resolveMaster(client); } + + if (result) { + this.initFailoverDetector({ + address: endpoint, + isConnected: true, + getClient: () => client, + }); + } + + return result; } finally { - client.disconnect(); + if (!result) { + // Only disconnect if we didn't get a result. + // Otherwise we'll use this connection for failover detection. + client.disconnect(); + } + } + } + + private initFailoverDetector(firstSentinel: ISentinel): void { + // Move the current sentinel to the first position + this.sentinelIterator.reset(true); + + const sentinels: ISentinel[] = [firstSentinel]; + + // Skip the first sentinel that we've already connected to + this.sentinelIterator.next(); + + // In case of a large amount of sentinels, limit the number of concurrent connections + while (sentinels.length < this.options.sentinelMaxConnections) { + const { done, value } = this.sentinelIterator.next(); + + if (done) { + break; + } + + let client: IRedisClient | null = null; + + const sentinel = { + address: value, + isConnected: false, + getClient: () => { + if (!client) { + client = this.connectToSentinel(value); + sentinel.isConnected = true; + } + + return client; + }, + }; + + sentinels.push(sentinel); } + + this.sentinelIterator.reset(false); + + if (this.failoverDetector) { + // Clean up previous detector + this.failoverDetector.cleanup(); + } + + this.failoverDetector = new FailoverDetector(this, sentinels); + } + + private async activateFailoverDetector() { + if (!this.failoverDetector) { + return; + } + + for (const client of this.failoverDetector.getClients()) { + // Apply reconnect strategy to sentinels now that we're no longer looking for master + client.options.retryStrategy = this.options.sentinelReconnectStrategy; + + // Tests listen to this event + client.on("reconnecting", () => { + this.emitter?.emit("sentinelReconnecting"); + }); + } + + // The sentinel clients can't be used for regular commands after this + await this.failoverDetector.subscribe(); + + // Tests listen to this event + this.emitter?.emit("failoverSubscribed"); } } diff --git a/lib/connectors/SentinelConnector/types.ts b/lib/connectors/SentinelConnector/types.ts index 14f0fba8..bb890e1c 100644 --- a/lib/connectors/SentinelConnector/types.ts +++ b/lib/connectors/SentinelConnector/types.ts @@ -1,4 +1,32 @@ +import { IRedisOptions } from "../../redis/RedisOptions"; + export interface ISentinelAddress { port: number; host: string; + family?: number; +} + +// TODO: A proper typedef. This one only declares a small subset of all the members. +export interface IRedisClient { + options: IRedisOptions; + sentinel(subcommand: "sentinels", name: string): Promise; + sentinel( + subcommand: "get-master-addr-by-name", + name: string + ): Promise; + sentinel(subcommand: "slaves", name: string): Promise; + subscribe(...channelNames: string[]): Promise; + on( + event: "message", + callback: (channel: string, message: string) => void + ): void; + on(event: "error", callback: (error: Error) => void): void; + on(event: "reconnecting", callback: () => void): void; + disconnect(): void; +} + +export interface ISentinel { + address: Partial; + isConnected: boolean; + getClient: () => IRedisClient; } diff --git a/lib/redis/RedisOptions.ts b/lib/redis/RedisOptions.ts index 33971d14..3e813e42 100644 --- a/lib/redis/RedisOptions.ts +++ b/lib/redis/RedisOptions.ts @@ -52,6 +52,14 @@ export const DEFAULT_REDIS_OPTIONS: IRedisOptions = { sentinelRetryStrategy: function (times) { return Math.min(times * 10, 1000); }, + sentinelReconnectStrategy: function () { + // This strategy only applies when sentinels are used for detecting + // a failover, not during initial master resolution. + // The deployment can still function when some of the sentinels are down + // for a long period of time, so we may not want to attempt reconnection + // very often. Therefore the default interval is fairly long (1 minute). + return 60000; + }, natMap: null, enableTLSForSentinelMode: false, updateSentinels: true, @@ -75,4 +83,5 @@ export const DEFAULT_REDIS_OPTIONS: IRedisOptions = { enableAutoPipelining: false, autoPipeliningIgnoredCommands: [], maxScriptsCachingTime: 60000, + sentinelMaxConnections: 10, }; diff --git a/lib/redis/index.ts b/lib/redis/index.ts index 57459990..88828e7b 100644 --- a/lib/redis/index.ts +++ b/lib/redis/index.ts @@ -153,7 +153,10 @@ function Redis() { if (this.options.Connector) { this.connector = new this.options.Connector(this.options); } else if (this.options.sentinels) { - this.connector = new SentinelConnector(this.options); + const sentinelConnector = new SentinelConnector(this.options); + sentinelConnector.emitter = this; + + this.connector = sentinelConnector; } else { this.connector = new StandaloneConnector(this.options); } diff --git a/test/functional/sentinel.ts b/test/functional/sentinel.ts index 204f35cb..68834610 100644 --- a/test/functional/sentinel.ts +++ b/test/functional/sentinel.ts @@ -1,8 +1,17 @@ +import { Socket } from "net"; + import Redis from "../../lib/redis"; import MockServer from "../helpers/mock_server"; +import { once } from "../helpers/once"; import { expect } from "chai"; import * as sinon from "sinon"; +function triggerParseError(socket: Socket) { + // Valid first characters: '$', '+', '*', ':', '-' + // To trigger an error, we need to write a different character + socket.write("A"); +} + describe("sentinel", function () { describe("connect", function () { it("should connect to sentinel successfully", function (done) { @@ -47,6 +56,41 @@ describe("sentinel", function () { }); }); + it("should skip an unresponsive sentinel", async function () { + const sentinel1 = new MockServer(27379, function (argv, socket, flags) { + flags.hang = true; + }); + + const sentinel2 = new MockServer(27380, function (argv) { + if (argv[0] === "sentinel" && argv[1] === "get-master-addr-by-name") { + return ["127.0.0.1", "17380"]; + } + }); + + const master = new MockServer(17380); + const clock = sinon.useFakeTimers(); + + const redis = new Redis({ + sentinels: [ + { host: "127.0.0.1", port: 27379 }, + { host: "127.0.0.1", port: 27380 }, + ], + name: "master", + sentinelCommandTimeout: 1000, + }); + + clock.tick(1000); + clock.restore(); + await once(master, "connect"); + + redis.disconnect(); + await Promise.all([ + sentinel1.disconnectPromise(), + sentinel2.disconnectPromise(), + master.disconnectPromise(), + ]); + }); + it("should call sentinelRetryStrategy when all sentinels are unreachable", function (done) { let t = 0; var redis = new Redis({ @@ -100,18 +144,11 @@ describe("sentinel", function () { } }); - it("should close the connection to the sentinel when resolving successfully", function (done) { - const sentinel = new MockServer(27379, function (argv) { - if (argv[0] === "sentinel" && argv[1] === "get-master-addr-by-name") { - return ["127.0.0.1", "17380"]; - } - }); - const master = new MockServer(17380); + it("should close the connection to the sentinel when resolving unsuccessfully", function (done) { + const sentinel = new MockServer(27379); // Does not respond properly to get-master-addr-by-name sentinel.once("disconnect", function () { redis.disconnect(); - master.disconnect(function () { - sentinel.disconnect(done); - }); + sentinel.disconnect(done); }); var redis = new Redis({ @@ -140,18 +177,19 @@ describe("sentinel", function () { } }); const master = new MockServer(17380); - sentinel.once("disconnect", function () { + + const redis = new Redis({ + sentinels: sentinels, + name: "master", + }); + + redis.on("ready", function () { redis.disconnect(); master.disconnect(function () { expect(cloned.length).to.eql(2); sentinel.disconnect(done); }); }); - - var redis = new Redis({ - sentinels: sentinels, - name: "master", - }); }); it("should skip additionally discovered sentinels even if they are resolved successfully", function (done) { @@ -168,7 +206,14 @@ describe("sentinel", function () { } }); const master = new MockServer(17380); - sentinel.once("disconnect", function () { + + const redis = new Redis({ + sentinels: sentinels, + updateSentinels: false, + name: "master", + }); + + redis.on("ready", function () { redis.disconnect(); master.disconnect(function () { expect(sentinels.length).to.eql(1); @@ -176,13 +221,8 @@ describe("sentinel", function () { sentinel.disconnect(done); }); }); - - var redis = new Redis({ - sentinels: sentinels, - updateSentinels: false, - name: "master", - }); }); + it("should connect to sentinel with authentication successfully", function (done) { let authed = false; var redisServer = new MockServer(17380, function (argv) { @@ -488,7 +528,7 @@ describe("sentinel", function () { } }); const master = new MockServer(17380); - master.on("connect", function (c) { + master.on("connect", function (c: Socket) { c.destroy(); master.disconnect(); redis.get("foo", function (err, res) { @@ -515,5 +555,243 @@ describe("sentinel", function () { name: "master", }); }); + + it("should connect to new master after +switch-master", async function () { + const sentinel = new MockServer(27379, function (argv) { + if (argv[0] === "sentinel" && argv[1] === "get-master-addr-by-name") { + return ["127.0.0.1", "17380"]; + } + }); + const master = new MockServer(17380); + const newMaster = new MockServer(17381); + + const redis = new Redis({ + sentinels: [{ host: "127.0.0.1", port: 27379 }], + name: "master", + }); + + await Promise.all([ + once(master, "connect"), + once(redis, "failoverSubscribed"), + ]); + + sentinel.handler = function (argv) { + if (argv[0] === "sentinel" && argv[1] === "get-master-addr-by-name") { + return ["127.0.0.1", "17381"]; + } + }; + + sentinel.broadcast([ + "message", + "+switch-master", + "master 127.0.0.1 17380 127.0.0.1 17381", + ]); + + await Promise.all([ + once(redis, "close"), // Wait until disconnects from old master + once(master, "disconnect"), + once(newMaster, "connect"), + ]); + + redis.disconnect(); // Disconnect from new master + + await Promise.all([ + sentinel.disconnectPromise(), + master.disconnectPromise(), + newMaster.disconnectPromise(), + ]); + }); + + it("should detect failover from secondary sentinel", async function () { + const sentinel1 = new MockServer(27379, function (argv) { + if (argv[0] === "sentinel" && argv[1] === "get-master-addr-by-name") { + return ["127.0.0.1", "17380"]; + } + }); + const sentinel2 = new MockServer(27380); + const master = new MockServer(17380); + const newMaster = new MockServer(17381); + + const redis = new Redis({ + sentinels: [ + { host: "127.0.0.1", port: 27379 }, + { host: "127.0.0.1", port: 27380 }, + ], + name: "master", + }); + + await Promise.all([ + once(master, "connect"), + once(redis, "failoverSubscribed"), + ]); + + // In this test, only the first sentinel is used to resolve the master + sentinel1.handler = function (argv) { + if (argv[0] === "sentinel" && argv[1] === "get-master-addr-by-name") { + return ["127.0.0.1", "17381"]; + } + }; + + // But only the second sentinel broadcasts +switch-master + sentinel2.broadcast([ + "message", + "+switch-master", + "master 127.0.0.1 17380 127.0.0.1 17381", + ]); + + await Promise.all([ + once(redis, "close"), // Wait until disconnects from old master + once(master, "disconnect"), + once(newMaster, "connect"), + ]); + + redis.disconnect(); // Disconnect from new master + + await Promise.all([ + sentinel1.disconnectPromise(), + sentinel2.disconnectPromise(), + master.disconnectPromise(), + newMaster.disconnectPromise(), + ]); + }); + + it("should detect failover when some sentinels fail", async function () { + // Will disconnect before failover + const sentinel1 = new MockServer(27379, function (argv) { + if (argv[0] === "sentinel" && argv[1] === "get-master-addr-by-name") { + return ["127.0.0.1", "17380"]; + } + }); + + // Will emit an error before failover + let sentinel2Socket: Socket | null = null; + const sentinel2 = new MockServer(27380, function (argv, socket) { + sentinel2Socket = socket; + }); + + // Fails to subscribe + const sentinel3 = new MockServer(27381, function (argv, socket, flags) { + if (argv[0] === "subscribe") { + triggerParseError(socket); + } + }); + + // The only sentinel that can successfully publish the failover message + const sentinel4 = new MockServer(27382); + + const master = new MockServer(17380); + const newMaster = new MockServer(17381); + + const redis = new Redis({ + sentinels: [ + { host: "127.0.0.1", port: 27379 }, + { host: "127.0.0.1", port: 27380 }, + { host: "127.0.0.1", port: 27381 }, + { host: "127.0.0.1", port: 27382 }, + ], + name: "master", + }); + + await Promise.all([ + once(master, "connect"), + + // Must resolve even though subscribing to sentinel3 fails + once(redis, "failoverSubscribed"), + ]); + + // Fail sentinels 1 and 2 + await sentinel1.disconnectPromise(); + triggerParseError(sentinel2Socket); + + sentinel4.handler = function (argv) { + if (argv[0] === "sentinel" && argv[1] === "get-master-addr-by-name") { + return ["127.0.0.1", "17381"]; + } + }; + + sentinel4.broadcast([ + "message", + "+switch-master", + "master 127.0.0.1 17380 127.0.0.1 17381", + ]); + + await Promise.all([ + once(redis, "close"), // Wait until disconnects from old master + once(master, "disconnect"), + once(newMaster, "connect"), + ]); + + redis.disconnect(); // Disconnect from new master + + await Promise.all([ + // sentinel1 is already disconnected + sentinel2.disconnectPromise(), + sentinel3.disconnectPromise(), + sentinel4.disconnectPromise(), + master.disconnectPromise(), + newMaster.disconnectPromise(), + ]); + }); + + it("should detect failover after sentinel disconnects and reconnects", async function () { + const sentinel = new MockServer(27379, function (argv) { + if (argv[0] === "sentinel" && argv[1] === "get-master-addr-by-name") { + return ["127.0.0.1", "17380"]; + } + }); + + const master = new MockServer(17380); + const newMaster = new MockServer(17381); + + const redis = new Redis({ + sentinels: [{ host: "127.0.0.1", port: 27379 }], + name: "master", + sentinelReconnectStrategy: () => 1000, + }); + + await Promise.all([ + once(master, "connect"), + once(redis, "failoverSubscribed"), + ]); + + await sentinel.disconnectPromise(); + + sentinel.handler = function (argv) { + if (argv[0] === "sentinel" && argv[1] === "get-master-addr-by-name") { + return ["127.0.0.1", "17381"]; + } + if (argv[0] === "subscribe") { + sentinel.emit("test:resubscribed"); // Custom event only used in tests + } + }; + + sentinel.connect(); + + const clock = sinon.useFakeTimers(); + await once(redis, "sentinelReconnecting"); // Wait for the timeout to be set + clock.tick(1000); + clock.restore(); + await once(sentinel, "test:resubscribed"); + + sentinel.broadcast([ + "message", + "+switch-master", + "master 127.0.0.1 17380 127.0.0.1 17381", + ]); + + await Promise.all([ + once(redis, "close"), // Wait until disconnects from old master + once(master, "disconnect"), + once(newMaster, "connect"), + ]); + + redis.disconnect(); // Disconnect from new master + + await Promise.all([ + sentinel.disconnectPromise(), + master.disconnectPromise(), + newMaster.disconnectPromise(), + ]); + }); }); }); diff --git a/test/helpers/mock_server.ts b/test/helpers/mock_server.ts index 5101f064..2fe07a28 100644 --- a/test/helpers/mock_server.ts +++ b/test/helpers/mock_server.ts @@ -124,6 +124,10 @@ export default class MockServer extends EventEmitter { this.socket.destroy(callback); } + disconnectPromise() { + return new Promise((resolve) => this.disconnect(resolve)); + } + broadcast(data: any) { this.clients .filter((c) => c) diff --git a/test/helpers/once.ts b/test/helpers/once.ts new file mode 100644 index 00000000..106524f0 --- /dev/null +++ b/test/helpers/once.ts @@ -0,0 +1,28 @@ +// TODO: use 'import { once } from "events";' instead of this +// after upgrading minimum Node.js version to 10.16+ + +// This polyfill is from https://github.com/davidmarkclements/events.once + +import EventEmitter from "events"; + +export const once = ( + emitter: EventEmitter, + name: string +): Promise => { + return new Promise((resolve, reject) => { + const onceError = name === "error"; + const listener = onceError + ? resolve + : (...args: any[]) => { + emitter.removeListener("error", error); + resolve(args as T); + }; + emitter.once(name, listener); + if (onceError) return; + const error = (err: any) => { + emitter.removeListener(name, listener); + reject(err); + }; + emitter.once("error", error); + }); +}; From 26a076d83ac41e9d20a5f5d49ad5cd26fa1fb657 Mon Sep 17 00:00:00 2001 From: Andres Kalle Date: Mon, 19 Apr 2021 11:47:00 +0300 Subject: [PATCH 2/2] Addressed feedback --- .../SentinelConnector/FailoverDetector.ts | 14 +--- lib/connectors/SentinelConnector/index.ts | 79 +++++-------------- lib/connectors/SentinelConnector/types.ts | 3 +- 3 files changed, 25 insertions(+), 71 deletions(-) diff --git a/lib/connectors/SentinelConnector/FailoverDetector.ts b/lib/connectors/SentinelConnector/FailoverDetector.ts index a08a114f..2b3a9d9e 100644 --- a/lib/connectors/SentinelConnector/FailoverDetector.ts +++ b/lib/connectors/SentinelConnector/FailoverDetector.ts @@ -21,25 +21,17 @@ export class FailoverDetector { this.isDisconnected = true; for (const sentinel of this.sentinels) { - if (sentinel.isConnected) { - sentinel.getClient().disconnect(); - } + sentinel.client.disconnect(); } } - public getClients() { - return this.sentinels.map((sentinel) => sentinel.getClient()); - } - public async subscribe() { debug("Starting FailoverDetector"); const promises: Promise[] = []; for (const sentinel of this.sentinels) { - const client = sentinel.getClient(); - - const promise = client.subscribe(CHANNEL_NAME).catch((err) => { + const promise = sentinel.client.subscribe(CHANNEL_NAME).catch((err) => { debug( "Failed to subscribe to failover messages on sentinel %s:%s (%s)", sentinel.address.host || "127.0.0.1", @@ -50,7 +42,7 @@ export class FailoverDetector { promises.push(promise); - client.on("message", (channel: string) => { + sentinel.client.on("message", (channel: string) => { if (!this.isDisconnected && channel === CHANNEL_NAME) { this.disconnect(); } diff --git a/lib/connectors/SentinelConnector/index.ts b/lib/connectors/SentinelConnector/index.ts index b2e51ae5..8a92417c 100644 --- a/lib/connectors/SentinelConnector/index.ts +++ b/lib/connectors/SentinelConnector/index.ts @@ -17,6 +17,7 @@ import { IRedisClient, ISentinelAddress, ISentinel } from "./types"; import AbstractConnector, { ErrorEmitter } from "../AbstractConnector"; import { NetStream } from "../../types"; import Redis from "../../redis"; +import { IRedisOptions } from "../../redis/RedisOptions"; import { FailoverDetector } from "./FailoverDetector"; const debug = Debug("SentinelConnector"); @@ -164,7 +165,7 @@ export default class SentinelConnector extends AbstractConnector { this.stream = createConnection(resolved); } - this.stream.once("connect", () => this.activateFailoverDetector()); + this.stream.once("connect", () => this.initFailoverDetector()); this.stream.once("error", (err) => { this.firstError = err; @@ -276,7 +277,10 @@ export default class SentinelConnector extends AbstractConnector { return this.options.natMap[`${item.host}:${item.port}`] || item; } - private connectToSentinel(endpoint: Partial): IRedisClient { + private connectToSentinel( + endpoint: Partial, + options?: Partial + ): IRedisClient { return new Redis({ port: endpoint.port || 26379, host: endpoint.host, @@ -293,6 +297,7 @@ export default class SentinelConnector extends AbstractConnector { connectTimeout: this.options.connectTimeout, commandTimeout: this.options.sentinelCommandTimeout, dropBufferSupport: true, + ...options, }); } @@ -304,41 +309,22 @@ export default class SentinelConnector extends AbstractConnector { // ignore the errors since resolve* methods will handle them client.on("error", noop); - let result: ITcpConnectionOptions | null = null; - try { if (this.options.role === "slave") { - result = await this.resolveSlave(client); + return await this.resolveSlave(client); } else { - result = await this.resolveMaster(client); - } - - if (result) { - this.initFailoverDetector({ - address: endpoint, - isConnected: true, - getClient: () => client, - }); + return await this.resolveMaster(client); } - - return result; } finally { - if (!result) { - // Only disconnect if we didn't get a result. - // Otherwise we'll use this connection for failover detection. - client.disconnect(); - } + client.disconnect(); } } - private initFailoverDetector(firstSentinel: ISentinel): void { + private async initFailoverDetector(): Promise { // Move the current sentinel to the first position this.sentinelIterator.reset(true); - const sentinels: ISentinel[] = [firstSentinel]; - - // Skip the first sentinel that we've already connected to - this.sentinelIterator.next(); + const sentinels: ISentinel[] = []; // In case of a large amount of sentinels, limit the number of concurrent connections while (sentinels.length < this.options.sentinelMaxConnections) { @@ -348,22 +334,17 @@ export default class SentinelConnector extends AbstractConnector { break; } - let client: IRedisClient | null = null; - - const sentinel = { - address: value, - isConnected: false, - getClient: () => { - if (!client) { - client = this.connectToSentinel(value); - sentinel.isConnected = true; - } + const client = this.connectToSentinel(value, { + lazyConnect: true, + retryStrategy: this.options.sentinelReconnectStrategy, + }); - return client; - }, - }; + client.on("reconnecting", () => { + // Tests listen to this event + this.emitter?.emit("sentinelReconnecting"); + }); - sentinels.push(sentinel); + sentinels.push({ address: value, client }); } this.sentinelIterator.reset(false); @@ -374,24 +355,6 @@ export default class SentinelConnector extends AbstractConnector { } this.failoverDetector = new FailoverDetector(this, sentinels); - } - - private async activateFailoverDetector() { - if (!this.failoverDetector) { - return; - } - - for (const client of this.failoverDetector.getClients()) { - // Apply reconnect strategy to sentinels now that we're no longer looking for master - client.options.retryStrategy = this.options.sentinelReconnectStrategy; - - // Tests listen to this event - client.on("reconnecting", () => { - this.emitter?.emit("sentinelReconnecting"); - }); - } - - // The sentinel clients can't be used for regular commands after this await this.failoverDetector.subscribe(); // Tests listen to this event diff --git a/lib/connectors/SentinelConnector/types.ts b/lib/connectors/SentinelConnector/types.ts index bb890e1c..6d7ef7f4 100644 --- a/lib/connectors/SentinelConnector/types.ts +++ b/lib/connectors/SentinelConnector/types.ts @@ -27,6 +27,5 @@ export interface IRedisClient { export interface ISentinel { address: Partial; - isConnected: boolean; - getClient: () => IRedisClient; + client: IRedisClient; }