Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Detect failover from +switch-master messages #1328

Merged
merged 3 commits into from
Apr 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions lib/connectors/SentinelConnector/FailoverDetector.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import { Debug } from "../../utils";
import SentinelConnector from "./index";
import { ISentinel } from "./types";

const debug = Debug("FailoverDetector");

const CHANNEL_NAME = "+switch-master";

export class FailoverDetector {
private connector: SentinelConnector;
private sentinels: ISentinel[];
private isDisconnected = false;

// sentinels can't be used for regular commands after this
constructor(connector: SentinelConnector, sentinels: ISentinel[]) {
this.connector = connector;
this.sentinels = sentinels;
}

public cleanup() {
this.isDisconnected = true;

for (const sentinel of this.sentinels) {
sentinel.client.disconnect();
}
}

public async subscribe() {
debug("Starting FailoverDetector");

const promises: Promise<unknown>[] = [];

for (const sentinel of this.sentinels) {
const promise = sentinel.client.subscribe(CHANNEL_NAME).catch((err) => {
debug(
"Failed to subscribe to failover messages on sentinel %s:%s (%s)",
sentinel.address.host || "127.0.0.1",
sentinel.address.port || 26739,
err.message
);
});

promises.push(promise);

sentinel.client.on("message", (channel: string) => {
if (!this.isDisconnected && channel === CHANNEL_NAME) {
this.disconnect();
}
});
}

await Promise.all(promises);
}

private disconnect() {
// Avoid disconnecting more than once per failover.
// A new FailoverDetector will be created after reconnecting.
this.isDisconnected = true;

debug("Failover detected, disconnecting");

// Will call this.cleanup()
this.connector.disconnect();
}
}
95 changes: 86 additions & 9 deletions lib/connectors/SentinelConnector/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { EventEmitter } from "events";
import { createConnection } from "net";
import { INatMap } from "../../cluster/ClusterOptions";
import {
Expand All @@ -12,10 +13,12 @@ import {
isIIpcConnectionOptions,
} from "../StandaloneConnector";
import SentinelIterator from "./SentinelIterator";
import { ISentinelAddress } from "./types";
import { IRedisClient, ISentinelAddress, ISentinel } from "./types";
import AbstractConnector, { ErrorEmitter } from "../AbstractConnector";
import { NetStream } from "../../types";
import Redis from "../../redis";
import { IRedisOptions } from "../../redis/RedisOptions";
import { FailoverDetector } from "./FailoverDetector";

const debug = Debug("SentinelConnector");

Expand All @@ -39,6 +42,7 @@ export interface ISentinelConnectionOptions extends ITcpConnectionOptions {
sentinelPassword?: string;
sentinels: Array<Partial<ISentinelAddress>>;
sentinelRetryStrategy?: (retryAttempts: number) => number | void | null;
sentinelReconnectStrategy?: (retryAttempts: number) => number | void | null;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about this name, could be confusing.

But I'm planning to submit a PR that documents all the sentinel options after this gets merged, so that might help.

preferredSlaves?: PreferredSlaves;
connectTimeout?: number;
disconnectTimeout?: number;
Expand All @@ -47,11 +51,14 @@ export interface ISentinelConnectionOptions extends ITcpConnectionOptions {
sentinelTLS?: ConnectionOptions;
natMap?: INatMap;
updateSentinels?: boolean;
sentinelMaxConnections?: number;
}

export default class SentinelConnector extends AbstractConnector {
private retryAttempts: number;
private failoverDetector: FailoverDetector | null = null;
protected sentinelIterator: SentinelIterator;
public emitter: EventEmitter | null = null;

constructor(protected options: ISentinelConnectionOptions) {
super(options.disconnectTimeout);
Expand Down Expand Up @@ -84,6 +91,14 @@ export default class SentinelConnector extends AbstractConnector {
return roleMatches;
}

public disconnect(): void {
super.disconnect();

if (this.failoverDetector) {
this.failoverDetector.cleanup();
}
}

public connect(eventEmitter: ErrorEmitter): Promise<NetStream> {
this.connecting = true;
this.retryAttempts = 0;
Expand Down Expand Up @@ -134,23 +149,30 @@ export default class SentinelConnector extends AbstractConnector {
throw new Error(CONNECTION_CLOSED_ERROR_MSG);
}

const endpointAddress = endpoint.value.host + ":" + endpoint.value.port;

if (resolved) {
debug("resolved: %s:%s", resolved.host, resolved.port);
debug(
"resolved: %s:%s from sentinel %s",
resolved.host,
resolved.port,
endpointAddress
);
if (this.options.enableTLSForSentinelMode && this.options.tls) {
Object.assign(resolved, this.options.tls);
this.stream = createTLSConnection(resolved);
} else {
this.stream = createConnection(resolved);
}

this.stream.once("connect", () => this.initFailoverDetector());

this.stream.once("error", (err) => {
this.firstError = err;
});

this.sentinelIterator.reset(true);
return this.stream;
} else {
const endpointAddress = endpoint.value.host + ":" + endpoint.value.port;
const errorMsg = err
? "failed to connect to sentinel " +
endpointAddress +
Expand All @@ -176,7 +198,7 @@ export default class SentinelConnector extends AbstractConnector {
return connectToNext();
}

private async updateSentinels(client): Promise<void> {
private async updateSentinels(client: IRedisClient): Promise<void> {
if (!this.options.updateSentinels) {
return;
}
Expand Down Expand Up @@ -209,7 +231,9 @@ export default class SentinelConnector extends AbstractConnector {
debug("Updated internal sentinels: %s", this.sentinelIterator);
}

private async resolveMaster(client): Promise<ITcpConnectionOptions | null> {
private async resolveMaster(
client: IRedisClient
): Promise<ITcpConnectionOptions | null> {
const result = await client.sentinel(
"get-master-addr-by-name",
this.options.name
Expand All @@ -224,7 +248,9 @@ export default class SentinelConnector extends AbstractConnector {
);
}

private async resolveSlave(client): Promise<ITcpConnectionOptions | null> {
private async resolveSlave(
client: IRedisClient
): Promise<ITcpConnectionOptions | null> {
const result = await client.sentinel("slaves", this.options.name);

if (!Array.isArray(result)) {
Expand All @@ -251,8 +277,11 @@ export default class SentinelConnector extends AbstractConnector {
return this.options.natMap[`${item.host}:${item.port}`] || item;
}

private async resolve(endpoint): Promise<ITcpConnectionOptions | null> {
const client = new Redis({
private connectToSentinel(
endpoint: Partial<ISentinelAddress>,
options?: Partial<IRedisOptions>
): IRedisClient {
return new Redis({
port: endpoint.port || 26379,
host: endpoint.host,
username: this.options.sentinelUsername || null,
Expand All @@ -268,7 +297,14 @@ export default class SentinelConnector extends AbstractConnector {
connectTimeout: this.options.connectTimeout,
commandTimeout: this.options.sentinelCommandTimeout,
dropBufferSupport: true,
...options,
});
}

private async resolve(
endpoint: Partial<ISentinelAddress>
): Promise<ITcpConnectionOptions | null> {
const client = this.connectToSentinel(endpoint);

// ignore the errors since resolve* methods will handle them
client.on("error", noop);
Expand All @@ -283,6 +319,47 @@ export default class SentinelConnector extends AbstractConnector {
client.disconnect();
}
}

private async initFailoverDetector(): Promise<void> {
// Move the current sentinel to the first position
this.sentinelIterator.reset(true);

const sentinels: ISentinel[] = [];

// In case of a large amount of sentinels, limit the number of concurrent connections
while (sentinels.length < this.options.sentinelMaxConnections) {
const { done, value } = this.sentinelIterator.next();

if (done) {
break;
}

const client = this.connectToSentinel(value, {
lazyConnect: true,
retryStrategy: this.options.sentinelReconnectStrategy,
});

client.on("reconnecting", () => {
// Tests listen to this event
this.emitter?.emit("sentinelReconnecting");
});

sentinels.push({ address: value, client });
}

this.sentinelIterator.reset(false);

if (this.failoverDetector) {
// Clean up previous detector
this.failoverDetector.cleanup();
}

this.failoverDetector = new FailoverDetector(this, sentinels);
await this.failoverDetector.subscribe();

// Tests listen to this event
this.emitter?.emit("failoverSubscribed");
}
}

function selectPreferredSentinel(
Expand Down
27 changes: 27 additions & 0 deletions lib/connectors/SentinelConnector/types.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,31 @@
import { IRedisOptions } from "../../redis/RedisOptions";

export interface ISentinelAddress {
port: number;
host: string;
family?: number;
}

// TODO: A proper typedef. This one only declares a small subset of all the members.
export interface IRedisClient {
options: IRedisOptions;
sentinel(subcommand: "sentinels", name: string): Promise<string[]>;
sentinel(
subcommand: "get-master-addr-by-name",
name: string
): Promise<string[]>;
sentinel(subcommand: "slaves", name: string): Promise<string[]>;
subscribe(...channelNames: string[]): Promise<number>;
on(
event: "message",
callback: (channel: string, message: string) => void
): void;
on(event: "error", callback: (error: Error) => void): void;
on(event: "reconnecting", callback: () => void): void;
disconnect(): void;
}

export interface ISentinel {
address: Partial<ISentinelAddress>;
client: IRedisClient;
}
9 changes: 9 additions & 0 deletions lib/redis/RedisOptions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ export const DEFAULT_REDIS_OPTIONS: IRedisOptions = {
sentinelRetryStrategy: function (times) {
return Math.min(times * 10, 1000);
},
sentinelReconnectStrategy: function () {
// This strategy only applies when sentinels are used for detecting
// a failover, not during initial master resolution.
// The deployment can still function when some of the sentinels are down
// for a long period of time, so we may not want to attempt reconnection
// very often. Therefore the default interval is fairly long (1 minute).
return 60000;
},
natMap: null,
enableTLSForSentinelMode: false,
updateSentinels: true,
Expand All @@ -75,4 +83,5 @@ export const DEFAULT_REDIS_OPTIONS: IRedisOptions = {
enableAutoPipelining: false,
autoPipeliningIgnoredCommands: [],
maxScriptsCachingTime: 60000,
sentinelMaxConnections: 10,
};
5 changes: 4 additions & 1 deletion lib/redis/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,10 @@ function Redis() {
if (this.options.Connector) {
this.connector = new this.options.Connector(this.options);
} else if (this.options.sentinels) {
this.connector = new SentinelConnector(this.options);
const sentinelConnector = new SentinelConnector(this.options);
sentinelConnector.emitter = this;

this.connector = sentinelConnector;
} else {
this.connector = new StandaloneConnector(this.options);
}
Expand Down
Loading