From bf3fe29ec78df184b359ee29468dbb058e886e59 Mon Sep 17 00:00:00 2001 From: Matias Lopez Date: Tue, 25 Jun 2019 10:59:35 -0400 Subject: [PATCH] feat: support custom connectors (#906) --- .eslintrc | 3 +- .travis.yml | 1 + examples/basic_operations.js | 4 +- examples/custom_connector.js | 53 ++++++++++++++ lib/connectors/AbstractConnector.ts | 2 +- .../SentinelConnector/SentinelIterator.ts | 14 ++-- lib/connectors/SentinelConnector/index.ts | 70 ++++++++++--------- lib/connectors/StandaloneConnector.ts | 40 ++++++----- lib/index.ts | 19 +++-- lib/redis/RedisOptions.ts | 2 + lib/redis/event_handler.ts | 2 +- lib/redis/index.ts | 24 ++++--- package.json | 4 +- 13 files changed, 154 insertions(+), 84 deletions(-) create mode 100644 examples/custom_connector.js diff --git a/.eslintrc b/.eslintrc index 1f01f7d5..85f55bfe 100644 --- a/.eslintrc +++ b/.eslintrc @@ -1,6 +1,7 @@ { "env": { - "node": true + "node": true, + "es6": true }, "rules": { "no-const-assign": 2, diff --git a/.travis.yml b/.travis.yml index 583f6ced..ed822692 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,6 +12,7 @@ services: - redis-server script: +- npm run build - npm run test:cov || npm run test:cov || npm run test:cov env: diff --git a/examples/basic_operations.js b/examples/basic_operations.js index 7f45edf7..36cb6f31 100644 --- a/examples/basic_operations.js +++ b/examples/basic_operations.js @@ -27,6 +27,4 @@ redis.sadd('set', [1, 3, 5, 7]); redis.set('key', 100, 'EX', 10); // Change the server configuration -redis.config('set', 'notify-keyspace-events', 'KEA') - - +redis.config('set', 'notify-keyspace-events', 'KEA'); diff --git a/examples/custom_connector.js b/examples/custom_connector.js new file mode 100644 index 00000000..fbe811fe --- /dev/null +++ b/examples/custom_connector.js @@ -0,0 +1,53 @@ +'use strict'; + +const Redis = require('ioredis'); +const MyService = require('path/to/my/service'); + +// Create a custom connector that fetches sentinels from an external call +class AsyncSentinelConnector extends Redis.SentinelConnector { + constructor(options = {}) { + // Placeholder + options.sentinels = options.sentinels || [{ host: 'localhost', port: 6379 }]; + + // SentinelConnector saves options as its property + super(options); + } + + connect(eventEmitter) { + return MyService.getSentinels().then(sentinels => { + this.options.sentinels = sentinels; + this.sentinelIterator = new Redis.SentinelIterator(sentinels); + return Redis.SentinelConnector.prototype.connect.call(this, eventEmitter); + }); + } +} + +const redis = new Redis({ + connector: new AsyncSentinelConnector() +}); + +// ioredis supports all Redis commands: +redis.set('foo', 'bar'); +redis.get('foo', function (err, result) { + if (err) { + console.error(err); + } else { + console.log(result); + } +}); +redis.del('foo'); + +// Or using a promise if the last argument isn't a function +redis.get('foo').then(function (result) { + console.log(result); +}); + +// Arguments to commands are flattened, so the following are the same: +redis.sadd('set', 1, 3, 5, 7); +redis.sadd('set', [1, 3, 5, 7]); + +// All arguments are passed directly to the redis server: +redis.set('key', 100, 'EX', 10); + +// Change the server configuration +redis.config('set', 'notify-keyspace-events', 'KEA'); diff --git a/lib/connectors/AbstractConnector.ts b/lib/connectors/AbstractConnector.ts index 6a7aa9c3..f608cb45 100644 --- a/lib/connectors/AbstractConnector.ts +++ b/lib/connectors/AbstractConnector.ts @@ -17,5 +17,5 @@ export default abstract class AbstractConnector { } } - public abstract connect (callback: Function, _: ErrorEmitter) + public abstract connect (_: ErrorEmitter): Promise } diff --git a/lib/connectors/SentinelConnector/SentinelIterator.ts b/lib/connectors/SentinelConnector/SentinelIterator.ts index 6cfdaf19..5ce55651 100644 --- a/lib/connectors/SentinelConnector/SentinelIterator.ts +++ b/lib/connectors/SentinelConnector/SentinelIterator.ts @@ -5,23 +5,19 @@ function isSentinelEql (a: Partial, b: Partial> { private cursor: number = 0 constructor (private sentinels: Partial[]) {} - hasNext (): boolean { - return this.cursor < this.sentinels.length - } - - next (): Partial | null { - return this.hasNext() ? this.sentinels[this.cursor++] : null + next () { + const done = this.cursor >= this.sentinels.length + return { done, value: done ? undefined : this.sentinels[this.cursor++] } } reset (moveCurrentEndpointToFirst: boolean): void { if (moveCurrentEndpointToFirst && this.sentinels.length > 1 && this.cursor !== 1) { - const remains = this.sentinels.slice(this.cursor - 1) - this.sentinels = remains.concat(this.sentinels.slice(0, this.cursor - 1)) + this.sentinels.unshift(...this.sentinels.splice(this.cursor - 1)) } this.cursor = 0 } diff --git a/lib/connectors/SentinelConnector/index.ts b/lib/connectors/SentinelConnector/index.ts index 3ca9af78..8e037973 100644 --- a/lib/connectors/SentinelConnector/index.ts +++ b/lib/connectors/SentinelConnector/index.ts @@ -6,7 +6,8 @@ import {ITcpConnectionOptions, isIIpcConnectionOptions} from '../StandaloneConne import SentinelIterator from './SentinelIterator' import {ISentinelAddress} from './types'; import AbstractConnector, { ErrorEmitter } from '../AbstractConnector' -import {NetStream} from '../../types' +import {NetStream, CallbackFunction} from '../../types'; +import * as PromiseContainer from '../../promiseContainer'; import Redis from '../../redis' const debug = Debug('SentinelConnector') @@ -17,12 +18,13 @@ interface IAddressFromResponse { flags?: string } -type NodeCallback = (err: Error | null, result?: T) => void type PreferredSlaves = ((slaves: Array) => IAddressFromResponse | null) | Array<{port: string, ip: string, prio?: number}> | {port: string, ip: string, prio?: number} +export { ISentinelAddress, SentinelIterator }; + export interface ISentinelConnectionOptions extends ITcpConnectionOptions { role: 'master' | 'slave' name: string @@ -39,12 +41,12 @@ export interface ISentinelConnectionOptions extends ITcpConnectionOptions { export default class SentinelConnector extends AbstractConnector { private retryAttempts: number - private sentinelIterator: SentinelIterator + protected sentinelIterator: SentinelIterator constructor (protected options: ISentinelConnectionOptions) { super() - if (this.options.sentinels.length === 0) { + if (!this.options.sentinels.length) { throw new Error('Requires at least one sentinel to connect to.') } if (!this.options.name) { @@ -68,19 +70,20 @@ export default class SentinelConnector extends AbstractConnector { return roleMatches } - public connect (callback: NodeCallback, eventEmitter: ErrorEmitter): void { + public connect (eventEmitter: ErrorEmitter): Promise { this.connecting = true this.retryAttempts = 0 let lastError - const _this = this - connectToNext() - - function connectToNext() { - if (!_this.sentinelIterator.hasNext()) { - _this.sentinelIterator.reset(false) - const retryDelay = typeof _this.options.sentinelRetryStrategy === 'function' - ? _this.options.sentinelRetryStrategy(++_this.retryAttempts) + const _Promise = PromiseContainer.get(); + + const connectToNext = () => new _Promise((resolve, reject) => { + const endpoint = this.sentinelIterator.next(); + + if (endpoint.done) { + this.sentinelIterator.reset(false) + const retryDelay = typeof this.options.sentinelRetryStrategy === 'function' + ? this.options.sentinelRetryStrategy(++this.retryAttempts) : null let errorMsg = typeof retryDelay !== 'number' @@ -95,32 +98,33 @@ export default class SentinelConnector extends AbstractConnector { const error = new Error(errorMsg) if (typeof retryDelay === 'number') { - setTimeout(connectToNext, retryDelay) + setTimeout(() => { + resolve(connectToNext()); + }, retryDelay) eventEmitter('error', error) } else { - callback(error) + reject(error) } return } - const endpoint = _this.sentinelIterator.next() - _this.resolve(endpoint, function (err, resolved) { - if (!_this.connecting) { - callback(new Error(CONNECTION_CLOSED_ERROR_MSG)) + this.resolve(endpoint.value, (err, resolved) => { + if (!this.connecting) { + reject(new Error(CONNECTION_CLOSED_ERROR_MSG)) return } if (resolved) { debug('resolved: %s:%s', resolved.host, resolved.port) - if (_this.options.enableTLSForSentinelMode && _this.options.tls) { - Object.assign(resolved, _this.options.tls) - _this.stream = createTLSConnection(resolved) + if (this.options.enableTLSForSentinelMode && this.options.tls) { + Object.assign(resolved, this.options.tls) + this.stream = createTLSConnection(resolved) } else { - _this.stream = createConnection(resolved) + this.stream = createConnection(resolved) } - _this.sentinelIterator.reset(true) - callback(null, _this.stream) + this.sentinelIterator.reset(true) + resolve(this.stream) } else { - const endpointAddress = endpoint.host + ':' + endpoint.port + const endpointAddress = endpoint.value.host + ':' + endpoint.value.port const errorMsg = err ? 'failed to connect to sentinel ' + endpointAddress + ' because ' + err.message : 'connected to sentinel ' + endpointAddress + ' successfully, but got an invalid reply: ' + resolved @@ -132,13 +136,15 @@ export default class SentinelConnector extends AbstractConnector { if (err) { lastError = err } - connectToNext() + resolve(connectToNext()) } }) - } + }); + + return connectToNext(); } - private updateSentinels (client, callback: NodeCallback): void { + private updateSentinels (client, callback: CallbackFunction): void { if (!this.options.updateSentinels) { return callback(null) @@ -167,7 +173,7 @@ export default class SentinelConnector extends AbstractConnector { }) } - private resolveMaster (client, callback: NodeCallback): void { + private resolveMaster (client, callback: CallbackFunction): void { client.sentinel('get-master-addr-by-name', this.options.name, (err, result) => { if (err) { client.disconnect() @@ -186,7 +192,7 @@ export default class SentinelConnector extends AbstractConnector { }) } - private resolveSlave (client, callback: NodeCallback): void { + private resolveSlave (client, callback: CallbackFunction): void { client.sentinel('slaves', this.options.name, (err, result) => { client.disconnect() if (err) { @@ -214,7 +220,7 @@ export default class SentinelConnector extends AbstractConnector { return this.options.natMap[`${item.host}:${item.port}`] || item } - private resolve (endpoint, callback: NodeCallback): void { + private resolve (endpoint, callback: CallbackFunction): void { var client = new Redis({ port: endpoint.port || 26379, host: endpoint.host, diff --git a/lib/connectors/StandaloneConnector.ts b/lib/connectors/StandaloneConnector.ts index 27673b31..c9297c58 100644 --- a/lib/connectors/StandaloneConnector.ts +++ b/lib/connectors/StandaloneConnector.ts @@ -2,6 +2,7 @@ import {createConnection, TcpNetConnectOpts, IpcNetConnectOpts} from 'net' import {connect as createTLSConnection, SecureContextOptions} from 'tls' import {CONNECTION_CLOSED_ERROR_MSG} from '../utils' import AbstractConnector, {ErrorEmitter} from './AbstractConnector' +import * as PromiseContainer from '../promiseContainer'; import {NetStream} from '../types' export function isIIpcConnectionOptions (value: any): value is IIpcConnectionOptions { @@ -21,7 +22,7 @@ export default class StandaloneConnector extends AbstractConnector { super() } - public connect (callback: Function, _: ErrorEmitter) { + public connect (_: ErrorEmitter) { const {options} = this this.connecting = true @@ -46,27 +47,28 @@ export default class StandaloneConnector extends AbstractConnector { if (options.tls) { Object.assign(connectionOptions, options.tls) } + + const _Promise = PromiseContainer.get(); + return new _Promise((resolve, reject) => { + process.nextTick(() => { + if (!this.connecting) { + reject(new Error(CONNECTION_CLOSED_ERROR_MSG)) + return + } - process.nextTick(() => { - if (!this.connecting) { - callback(new Error(CONNECTION_CLOSED_ERROR_MSG)) - return - } - - let stream: NetStream - try { - if (options.tls) { - stream = createTLSConnection(connectionOptions) - } else { - stream = createConnection(connectionOptions) + try { + if (options.tls) { + this.stream = createTLSConnection(connectionOptions) + } else { + this.stream = createConnection(connectionOptions) + } + } catch (err) { + reject(err) + return } - } catch (err) { - callback(err) - return - } - this.stream = stream - callback(null, stream) + resolve(this.stream) + }) }) } } diff --git a/lib/index.ts b/lib/index.ts index 0e58af23..eb33b9e9 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -1,10 +1,19 @@ exports = module.exports = require('./redis').default -export {ReplyError} from 'redis-errors' -export const Cluster = require('./cluster').default -export const Command = require('./command').default -export const ScanStream = require('./ScanStream').default -export const Pipeline = require('./pipeline').default +export {default} from './redis'; +export {default as Cluster} from './cluster' +export {default as Command} from './command' +export {default as ScanStream} from './ScanStream' +export {default as Pipeline} from './pipeline' +export {default as AbstractConnector} from './connectors/AbstractConnector' +export {default as SentinelConnector, SentinelIterator} from './connectors/SentinelConnector' + +// Type Exports +export {ISentinelAddress} from './connectors/SentinelConnector' +export {IRedisOptions} from './redis/RedisOptions' + +// No TS typings +export const ReplyError = require('redis-errors').ReplyError const PromiseContainer = require('./promiseContainer') Object.defineProperty(exports, 'Promise', { diff --git a/lib/redis/RedisOptions.ts b/lib/redis/RedisOptions.ts index 2e094124..8581faf6 100644 --- a/lib/redis/RedisOptions.ts +++ b/lib/redis/RedisOptions.ts @@ -1,10 +1,12 @@ import {ISentinelConnectionOptions} from '../connectors/SentinelConnector'; +import AbstractConnector from '../connectors/AbstractConnector'; import {IClusterOptions} from '../cluster/ClusterOptions'; import {ICommanderOptions} from '../commander'; export type ReconnectOnError = (err: Error) => boolean | 1 | 2; export interface IRedisOptions extends Partial, Partial, Partial { + connector?: AbstractConnector, retryStrategy?: (times: number) => number | void | null, keepAlive?: number, noDelay?: boolean, diff --git a/lib/redis/event_handler.ts b/lib/redis/event_handler.ts index 90734a67..35d1218d 100644 --- a/lib/redis/event_handler.ts +++ b/lib/redis/event_handler.ts @@ -54,7 +54,7 @@ export function connectHandler(self) { } } else { self.serverInfo = info; - if (self.connector.check(info)) { + if (self.options.connector.check(info)) { exports.readyHandler(self)(); } else { self.disconnect(true); diff --git a/lib/redis/index.ts b/lib/redis/index.ts index b3145829..7b283da5 100644 --- a/lib/redis/index.ts +++ b/lib/redis/index.ts @@ -132,10 +132,12 @@ function Redis() { this.resetCommandQueue(); this.resetOfflineQueue(); - if (this.options.sentinels) { - this.connector = new SentinelConnector(this.options); - } else { - this.connector = new StandaloneConnector(this.options); + if (!this.options.connector) { + if (this.options.sentinels) { + this.options.connector = new SentinelConnector(this.options); + } else { + this.options.connector = new StandaloneConnector(this.options); + } } this.retryAttempts = 0; @@ -232,8 +234,8 @@ Redis.prototype.setStatus = function (status, arg) { * @public */ Redis.prototype.connect = function (callback) { - var Promise = PromiseContainer.get(); - var promise = new Promise(function (resolve, reject) { + var _Promise = PromiseContainer.get(); + var promise = new _Promise((resolve, reject) => { if (this.status === 'connecting' || this.status === 'connect' || this.status === 'ready') { reject(new Error('Redis is already connecting/connected')); return; @@ -249,7 +251,9 @@ Redis.prototype.connect = function (callback) { }; var _this = this; - this.connector.connect(function (err, stream) { + asCallback(options.connector.connect(function (type, err) { + _this.silentEmit(type, err); + }), function (err, stream) { if (err) { _this.flushQueue(err); _this.silentEmit('error', err); @@ -316,10 +320,8 @@ Redis.prototype.connect = function (callback) { }; _this.once('ready', connectionReadyHandler); _this.once('close', connectionCloseHandler); - }, function (type, err) { - _this.silentEmit(type, err); }); - }.bind(this)) + }) return asCallback(promise, callback) }; @@ -343,7 +345,7 @@ Redis.prototype.disconnect = function (reconnect) { if (this.status === 'wait') { eventHandler.closeHandler(this)(); } else { - this.connector.disconnect(); + this.options.connector.disconnect(); } }; diff --git a/package.json b/package.json index 43e4c8ca..01f6443d 100644 --- a/package.json +++ b/package.json @@ -7,8 +7,8 @@ "built/" ], "scripts": { - "test": "NODE_ENV=test mocha test/**/*.ts", - "test:cov": "NODE_ENV=test node ./node_modules/istanbul/lib/cli.js cover --preserve-comments ./node_modules/mocha/bin/_mocha -- -r ts-node/register -R spec --exit test/**/*.ts", + "test": "TS_NODE_LOG_ERROR=true NODE_ENV=test mocha test/**/*.ts", + "test:cov": "TS_NODE_LOG_ERROR=true NODE_ENV=test node ./node_modules/istanbul/lib/cli.js cover --preserve-comments ./node_modules/mocha/bin/_mocha -- -r ts-node/register -R spec --exit test/**/*.ts", "build": "rm -rf built && tsc", "prepublishOnly": "npm run build && npm test", "bench": "matcha benchmarks/*.js",