Skip to content

Commit

Permalink
feat: support custom connectors (#906)
Browse files Browse the repository at this point in the history
  • Loading branch information
imatlopez authored and luin committed Jun 25, 2019
1 parent 224df78 commit bf3fe29
Show file tree
Hide file tree
Showing 13 changed files with 154 additions and 84 deletions.
3 changes: 2 additions & 1 deletion .eslintrc
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"env": {
"node": true
"node": true,
"es6": true
},
"rules": {
"no-const-assign": 2,
Expand Down
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ services:
- redis-server

script:
- npm run build
- npm run test:cov || npm run test:cov || npm run test:cov

env:
Expand Down
4 changes: 1 addition & 3 deletions examples/basic_operations.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,4 @@ redis.sadd('set', [1, 3, 5, 7]);
redis.set('key', 100, 'EX', 10);

// Change the server configuration
redis.config('set', 'notify-keyspace-events', 'KEA')


redis.config('set', 'notify-keyspace-events', 'KEA');
53 changes: 53 additions & 0 deletions examples/custom_connector.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
'use strict';

const Redis = require('ioredis');
const MyService = require('path/to/my/service');

// Create a custom connector that fetches sentinels from an external call
class AsyncSentinelConnector extends Redis.SentinelConnector {
constructor(options = {}) {
// Placeholder
options.sentinels = options.sentinels || [{ host: 'localhost', port: 6379 }];

// SentinelConnector saves options as its property
super(options);
}

connect(eventEmitter) {
return MyService.getSentinels().then(sentinels => {
this.options.sentinels = sentinels;
this.sentinelIterator = new Redis.SentinelIterator(sentinels);
return Redis.SentinelConnector.prototype.connect.call(this, eventEmitter);
});
}
}

const redis = new Redis({
connector: new AsyncSentinelConnector()
});

// ioredis supports all Redis commands:
redis.set('foo', 'bar');
redis.get('foo', function (err, result) {
if (err) {
console.error(err);
} else {
console.log(result);
}
});
redis.del('foo');

// Or using a promise if the last argument isn't a function
redis.get('foo').then(function (result) {
console.log(result);
});

// Arguments to commands are flattened, so the following are the same:
redis.sadd('set', 1, 3, 5, 7);
redis.sadd('set', [1, 3, 5, 7]);

// All arguments are passed directly to the redis server:
redis.set('key', 100, 'EX', 10);

// Change the server configuration
redis.config('set', 'notify-keyspace-events', 'KEA');
2 changes: 1 addition & 1 deletion lib/connectors/AbstractConnector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ export default abstract class AbstractConnector {
}
}

public abstract connect (callback: Function, _: ErrorEmitter)
public abstract connect (_: ErrorEmitter): Promise<NetStream>
}
14 changes: 5 additions & 9 deletions lib/connectors/SentinelConnector/SentinelIterator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,19 @@ function isSentinelEql (a: Partial<ISentinelAddress>, b: Partial<ISentinelAddres
((a.port || 26379) === (b.port || 26379))
}

export default class SentinelIterator {
export default class SentinelIterator implements Iterator<Partial<ISentinelAddress>> {
private cursor: number = 0

constructor (private sentinels: Partial<ISentinelAddress>[]) {}

hasNext (): boolean {
return this.cursor < this.sentinels.length
}

next (): Partial<ISentinelAddress> | null {
return this.hasNext() ? this.sentinels[this.cursor++] : null
next () {
const done = this.cursor >= this.sentinels.length
return { done, value: done ? undefined : this.sentinels[this.cursor++] }
}

reset (moveCurrentEndpointToFirst: boolean): void {
if (moveCurrentEndpointToFirst && this.sentinels.length > 1 && this.cursor !== 1) {
const remains = this.sentinels.slice(this.cursor - 1)
this.sentinels = remains.concat(this.sentinels.slice(0, this.cursor - 1))
this.sentinels.unshift(...this.sentinels.splice(this.cursor - 1))
}
this.cursor = 0
}
Expand Down
70 changes: 38 additions & 32 deletions lib/connectors/SentinelConnector/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import {ITcpConnectionOptions, isIIpcConnectionOptions} from '../StandaloneConne
import SentinelIterator from './SentinelIterator'
import {ISentinelAddress} from './types';
import AbstractConnector, { ErrorEmitter } from '../AbstractConnector'
import {NetStream} from '../../types'
import {NetStream, CallbackFunction} from '../../types';
import * as PromiseContainer from '../../promiseContainer';
import Redis from '../../redis'

const debug = Debug('SentinelConnector')
Expand All @@ -17,12 +18,13 @@ interface IAddressFromResponse {
flags?: string
}

type NodeCallback<T = void> = (err: Error | null, result?: T) => void
type PreferredSlaves =
((slaves: Array<IAddressFromResponse>) => IAddressFromResponse | null) |
Array<{port: string, ip: string, prio?: number}> |
{port: string, ip: string, prio?: number}

export { ISentinelAddress, SentinelIterator };

export interface ISentinelConnectionOptions extends ITcpConnectionOptions {
role: 'master' | 'slave'
name: string
Expand All @@ -39,12 +41,12 @@ export interface ISentinelConnectionOptions extends ITcpConnectionOptions {

export default class SentinelConnector extends AbstractConnector {
private retryAttempts: number
private sentinelIterator: SentinelIterator
protected sentinelIterator: SentinelIterator

constructor (protected options: ISentinelConnectionOptions) {
super()

if (this.options.sentinels.length === 0) {
if (!this.options.sentinels.length) {
throw new Error('Requires at least one sentinel to connect to.')
}
if (!this.options.name) {
Expand All @@ -68,19 +70,20 @@ export default class SentinelConnector extends AbstractConnector {
return roleMatches
}

public connect (callback: NodeCallback<NetStream>, eventEmitter: ErrorEmitter): void {
public connect (eventEmitter: ErrorEmitter): Promise<NetStream> {
this.connecting = true
this.retryAttempts = 0

let lastError
const _this = this
connectToNext()

function connectToNext() {
if (!_this.sentinelIterator.hasNext()) {
_this.sentinelIterator.reset(false)
const retryDelay = typeof _this.options.sentinelRetryStrategy === 'function'
? _this.options.sentinelRetryStrategy(++_this.retryAttempts)
const _Promise = PromiseContainer.get();

const connectToNext = () => new _Promise<NetStream>((resolve, reject) => {
const endpoint = this.sentinelIterator.next();

if (endpoint.done) {
this.sentinelIterator.reset(false)
const retryDelay = typeof this.options.sentinelRetryStrategy === 'function'
? this.options.sentinelRetryStrategy(++this.retryAttempts)
: null

let errorMsg = typeof retryDelay !== 'number'
Expand All @@ -95,32 +98,33 @@ export default class SentinelConnector extends AbstractConnector {

const error = new Error(errorMsg)
if (typeof retryDelay === 'number') {
setTimeout(connectToNext, retryDelay)
setTimeout(() => {
resolve(connectToNext());
}, retryDelay)
eventEmitter('error', error)
} else {
callback(error)
reject(error)
}
return
}

const endpoint = _this.sentinelIterator.next()
_this.resolve(endpoint, function (err, resolved) {
if (!_this.connecting) {
callback(new Error(CONNECTION_CLOSED_ERROR_MSG))
this.resolve(endpoint.value, (err, resolved) => {
if (!this.connecting) {
reject(new Error(CONNECTION_CLOSED_ERROR_MSG))
return
}
if (resolved) {
debug('resolved: %s:%s', resolved.host, resolved.port)
if (_this.options.enableTLSForSentinelMode && _this.options.tls) {
Object.assign(resolved, _this.options.tls)
_this.stream = createTLSConnection(resolved)
if (this.options.enableTLSForSentinelMode && this.options.tls) {
Object.assign(resolved, this.options.tls)
this.stream = createTLSConnection(resolved)
} else {
_this.stream = createConnection(resolved)
this.stream = createConnection(resolved)
}
_this.sentinelIterator.reset(true)
callback(null, _this.stream)
this.sentinelIterator.reset(true)
resolve(this.stream)
} else {
const endpointAddress = endpoint.host + ':' + endpoint.port
const endpointAddress = endpoint.value.host + ':' + endpoint.value.port
const errorMsg = err
? 'failed to connect to sentinel ' + endpointAddress + ' because ' + err.message
: 'connected to sentinel ' + endpointAddress + ' successfully, but got an invalid reply: ' + resolved
Expand All @@ -132,13 +136,15 @@ export default class SentinelConnector extends AbstractConnector {
if (err) {
lastError = err
}
connectToNext()
resolve(connectToNext())
}
})
}
});

return connectToNext();
}

private updateSentinels (client, callback: NodeCallback): void {
private updateSentinels (client, callback: CallbackFunction): void {

if (!this.options.updateSentinels) {
return callback(null)
Expand Down Expand Up @@ -167,7 +173,7 @@ export default class SentinelConnector extends AbstractConnector {
})
}

private resolveMaster (client, callback: NodeCallback<ITcpConnectionOptions>): void {
private resolveMaster (client, callback: CallbackFunction<ITcpConnectionOptions>): void {
client.sentinel('get-master-addr-by-name', this.options.name, (err, result) => {
if (err) {
client.disconnect()
Expand All @@ -186,7 +192,7 @@ export default class SentinelConnector extends AbstractConnector {
})
}

private resolveSlave (client, callback: NodeCallback<ITcpConnectionOptions | null>): void {
private resolveSlave (client, callback: CallbackFunction<ITcpConnectionOptions | null>): void {
client.sentinel('slaves', this.options.name, (err, result) => {
client.disconnect()
if (err) {
Expand Down Expand Up @@ -214,7 +220,7 @@ export default class SentinelConnector extends AbstractConnector {
return this.options.natMap[`${item.host}:${item.port}`] || item
}

private resolve (endpoint, callback: NodeCallback<ITcpConnectionOptions>): void {
private resolve (endpoint, callback: CallbackFunction<ITcpConnectionOptions>): void {
var client = new Redis({
port: endpoint.port || 26379,
host: endpoint.host,
Expand Down
40 changes: 21 additions & 19 deletions lib/connectors/StandaloneConnector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {createConnection, TcpNetConnectOpts, IpcNetConnectOpts} from 'net'
import {connect as createTLSConnection, SecureContextOptions} from 'tls'
import {CONNECTION_CLOSED_ERROR_MSG} from '../utils'
import AbstractConnector, {ErrorEmitter} from './AbstractConnector'
import * as PromiseContainer from '../promiseContainer';
import {NetStream} from '../types'

export function isIIpcConnectionOptions (value: any): value is IIpcConnectionOptions {
Expand All @@ -21,7 +22,7 @@ export default class StandaloneConnector extends AbstractConnector {
super()
}

public connect (callback: Function, _: ErrorEmitter) {
public connect (_: ErrorEmitter) {
const {options} = this
this.connecting = true

Expand All @@ -46,27 +47,28 @@ export default class StandaloneConnector extends AbstractConnector {
if (options.tls) {
Object.assign(connectionOptions, options.tls)
}

const _Promise = PromiseContainer.get();
return new _Promise<NetStream>((resolve, reject) => {
process.nextTick(() => {
if (!this.connecting) {
reject(new Error(CONNECTION_CLOSED_ERROR_MSG))
return
}

process.nextTick(() => {
if (!this.connecting) {
callback(new Error(CONNECTION_CLOSED_ERROR_MSG))
return
}

let stream: NetStream
try {
if (options.tls) {
stream = createTLSConnection(connectionOptions)
} else {
stream = createConnection(connectionOptions)
try {
if (options.tls) {
this.stream = createTLSConnection(connectionOptions)
} else {
this.stream = createConnection(connectionOptions)
}
} catch (err) {
reject(err)
return
}
} catch (err) {
callback(err)
return
}

this.stream = stream
callback(null, stream)
resolve(this.stream)
})
})
}
}
19 changes: 14 additions & 5 deletions lib/index.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
exports = module.exports = require('./redis').default

export {ReplyError} from 'redis-errors'
export const Cluster = require('./cluster').default
export const Command = require('./command').default
export const ScanStream = require('./ScanStream').default
export const Pipeline = require('./pipeline').default
export {default} from './redis';
export {default as Cluster} from './cluster'
export {default as Command} from './command'
export {default as ScanStream} from './ScanStream'
export {default as Pipeline} from './pipeline'
export {default as AbstractConnector} from './connectors/AbstractConnector'
export {default as SentinelConnector, SentinelIterator} from './connectors/SentinelConnector'

// Type Exports
export {ISentinelAddress} from './connectors/SentinelConnector'
export {IRedisOptions} from './redis/RedisOptions'

// No TS typings
export const ReplyError = require('redis-errors').ReplyError

const PromiseContainer = require('./promiseContainer')
Object.defineProperty(exports, 'Promise', {
Expand Down
2 changes: 2 additions & 0 deletions lib/redis/RedisOptions.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import {ISentinelConnectionOptions} from '../connectors/SentinelConnector';
import AbstractConnector from '../connectors/AbstractConnector';
import {IClusterOptions} from '../cluster/ClusterOptions';
import {ICommanderOptions} from '../commander';

export type ReconnectOnError = (err: Error) => boolean | 1 | 2;

export interface IRedisOptions extends Partial<ISentinelConnectionOptions>, Partial<ICommanderOptions>, Partial<IClusterOptions> {
connector?: AbstractConnector,
retryStrategy?: (times: number) => number | void | null,
keepAlive?: number,
noDelay?: boolean,
Expand Down
2 changes: 1 addition & 1 deletion lib/redis/event_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ export function connectHandler(self) {
}
} else {
self.serverInfo = info;
if (self.connector.check(info)) {
if (self.options.connector.check(info)) {
exports.readyHandler(self)();
} else {
self.disconnect(true);
Expand Down
Loading

0 comments on commit bf3fe29

Please sign in to comment.