Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom connectors and additional typings #906

Merged
merged 20 commits into from
Jun 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .eslintrc
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"env": {
"node": true
"node": true,
"es6": true
},
"rules": {
"no-const-assign": 2,
Expand Down
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ services:
- redis-server

script:
- npm run build
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why npm run build is needed here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test may have type errors like trying to access private properties, but the code itself is fine, so this guards against build time errors while the tests are for logical errors only.

- npm run test:cov || npm run test:cov || npm run test:cov

env:
Expand Down
4 changes: 1 addition & 3 deletions examples/basic_operations.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,4 @@ redis.sadd('set', [1, 3, 5, 7]);
redis.set('key', 100, 'EX', 10);

// Change the server configuration
redis.config('set', 'notify-keyspace-events', 'KEA')


redis.config('set', 'notify-keyspace-events', 'KEA');
53 changes: 53 additions & 0 deletions examples/custom_connector.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
'use strict';

const Redis = require('ioredis');
const MyService = require('path/to/my/service');

// Create a custom connector that fetches sentinels from an external call
class AsyncSentinelConnector extends Redis.SentinelConnector {
constructor(options = {}) {
// Placeholder
options.sentinels = options.sentinels || [{ host: 'localhost', port: 6379 }];

// SentinelConnector saves options as its property
super(options);
}

connect(eventEmitter) {
return MyService.getSentinels().then(sentinels => {
this.options.sentinels = sentinels;
this.sentinelIterator = new Redis.SentinelIterator(sentinels);
return Redis.SentinelConnector.prototype.connect.call(this, eventEmitter);
});
}
}

const redis = new Redis({
connector: new AsyncSentinelConnector()
});

// ioredis supports all Redis commands:
redis.set('foo', 'bar');
redis.get('foo', function (err, result) {
if (err) {
console.error(err);
} else {
console.log(result);
}
});
redis.del('foo');

// Or using a promise if the last argument isn't a function
redis.get('foo').then(function (result) {
console.log(result);
});

// Arguments to commands are flattened, so the following are the same:
redis.sadd('set', 1, 3, 5, 7);
redis.sadd('set', [1, 3, 5, 7]);

// All arguments are passed directly to the redis server:
redis.set('key', 100, 'EX', 10);

// Change the server configuration
redis.config('set', 'notify-keyspace-events', 'KEA');
2 changes: 1 addition & 1 deletion lib/connectors/AbstractConnector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ export default abstract class AbstractConnector {
}
}

public abstract connect (callback: Function, _: ErrorEmitter)
public abstract connect (_: ErrorEmitter): Promise<NetStream>
}
14 changes: 5 additions & 9 deletions lib/connectors/SentinelConnector/SentinelIterator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,19 @@ function isSentinelEql (a: Partial<ISentinelAddress>, b: Partial<ISentinelAddres
((a.port || 26379) === (b.port || 26379))
}

export default class SentinelIterator {
export default class SentinelIterator implements Iterator<Partial<ISentinelAddress>> {
private cursor: number = 0

constructor (private sentinels: Partial<ISentinelAddress>[]) {}

hasNext (): boolean {
return this.cursor < this.sentinels.length
}

next (): Partial<ISentinelAddress> | null {
return this.hasNext() ? this.sentinels[this.cursor++] : null
next () {
const done = this.cursor >= this.sentinels.length
return { done, value: done ? undefined : this.sentinels[this.cursor++] }
}

reset (moveCurrentEndpointToFirst: boolean): void {
if (moveCurrentEndpointToFirst && this.sentinels.length > 1 && this.cursor !== 1) {
const remains = this.sentinels.slice(this.cursor - 1)
this.sentinels = remains.concat(this.sentinels.slice(0, this.cursor - 1))
this.sentinels.unshift(...this.sentinels.splice(this.cursor - 1))
}
this.cursor = 0
}
Expand Down
70 changes: 38 additions & 32 deletions lib/connectors/SentinelConnector/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import {ITcpConnectionOptions, isIIpcConnectionOptions} from '../StandaloneConne
import SentinelIterator from './SentinelIterator'
import {ISentinelAddress} from './types';
import AbstractConnector, { ErrorEmitter } from '../AbstractConnector'
import {NetStream} from '../../types'
import {NetStream, CallbackFunction} from '../../types';
import * as PromiseContainer from '../../promiseContainer';
import Redis from '../../redis'

const debug = Debug('SentinelConnector')
Expand All @@ -17,12 +18,13 @@ interface IAddressFromResponse {
flags?: string
}

type NodeCallback<T = void> = (err: Error | null, result?: T) => void
type PreferredSlaves =
((slaves: Array<IAddressFromResponse>) => IAddressFromResponse | null) |
Array<{port: string, ip: string, prio?: number}> |
{port: string, ip: string, prio?: number}

export { ISentinelAddress, SentinelIterator };

export interface ISentinelConnectionOptions extends ITcpConnectionOptions {
role: 'master' | 'slave'
name: string
Expand All @@ -39,12 +41,12 @@ export interface ISentinelConnectionOptions extends ITcpConnectionOptions {

export default class SentinelConnector extends AbstractConnector {
private retryAttempts: number
private sentinelIterator: SentinelIterator
protected sentinelIterator: SentinelIterator

constructor (protected options: ISentinelConnectionOptions) {
super()

if (this.options.sentinels.length === 0) {
if (!this.options.sentinels.length) {
throw new Error('Requires at least one sentinel to connect to.')
}
if (!this.options.name) {
Expand All @@ -68,19 +70,20 @@ export default class SentinelConnector extends AbstractConnector {
return roleMatches
}

public connect (callback: NodeCallback<NetStream>, eventEmitter: ErrorEmitter): void {
public connect (eventEmitter: ErrorEmitter): Promise<NetStream> {
this.connecting = true
this.retryAttempts = 0

let lastError
const _this = this
connectToNext()

function connectToNext() {
if (!_this.sentinelIterator.hasNext()) {
_this.sentinelIterator.reset(false)
const retryDelay = typeof _this.options.sentinelRetryStrategy === 'function'
? _this.options.sentinelRetryStrategy(++_this.retryAttempts)
const _Promise = PromiseContainer.get();

const connectToNext = () => new _Promise<NetStream>((resolve, reject) => {
const endpoint = this.sentinelIterator.next();

if (endpoint.done) {
this.sentinelIterator.reset(false)
const retryDelay = typeof this.options.sentinelRetryStrategy === 'function'
? this.options.sentinelRetryStrategy(++this.retryAttempts)
: null

let errorMsg = typeof retryDelay !== 'number'
Expand All @@ -95,32 +98,33 @@ export default class SentinelConnector extends AbstractConnector {

const error = new Error(errorMsg)
if (typeof retryDelay === 'number') {
setTimeout(connectToNext, retryDelay)
setTimeout(() => {
resolve(connectToNext());
}, retryDelay)
eventEmitter('error', error)
} else {
callback(error)
reject(error)
}
return
}

const endpoint = _this.sentinelIterator.next()
_this.resolve(endpoint, function (err, resolved) {
if (!_this.connecting) {
callback(new Error(CONNECTION_CLOSED_ERROR_MSG))
this.resolve(endpoint.value, (err, resolved) => {
if (!this.connecting) {
reject(new Error(CONNECTION_CLOSED_ERROR_MSG))
return
}
if (resolved) {
debug('resolved: %s:%s', resolved.host, resolved.port)
if (_this.options.enableTLSForSentinelMode && _this.options.tls) {
Object.assign(resolved, _this.options.tls)
_this.stream = createTLSConnection(resolved)
if (this.options.enableTLSForSentinelMode && this.options.tls) {
Object.assign(resolved, this.options.tls)
this.stream = createTLSConnection(resolved)
} else {
_this.stream = createConnection(resolved)
this.stream = createConnection(resolved)
}
_this.sentinelIterator.reset(true)
callback(null, _this.stream)
this.sentinelIterator.reset(true)
resolve(this.stream)
} else {
const endpointAddress = endpoint.host + ':' + endpoint.port
const endpointAddress = endpoint.value.host + ':' + endpoint.value.port
const errorMsg = err
? 'failed to connect to sentinel ' + endpointAddress + ' because ' + err.message
: 'connected to sentinel ' + endpointAddress + ' successfully, but got an invalid reply: ' + resolved
Expand All @@ -132,13 +136,15 @@ export default class SentinelConnector extends AbstractConnector {
if (err) {
lastError = err
}
connectToNext()
resolve(connectToNext())
}
})
}
});

return connectToNext();
}

private updateSentinels (client, callback: NodeCallback): void {
private updateSentinels (client, callback: CallbackFunction): void {

if (!this.options.updateSentinels) {
return callback(null)
Expand Down Expand Up @@ -167,7 +173,7 @@ export default class SentinelConnector extends AbstractConnector {
})
}

private resolveMaster (client, callback: NodeCallback<ITcpConnectionOptions>): void {
private resolveMaster (client, callback: CallbackFunction<ITcpConnectionOptions>): void {
client.sentinel('get-master-addr-by-name', this.options.name, (err, result) => {
if (err) {
client.disconnect()
Expand All @@ -186,7 +192,7 @@ export default class SentinelConnector extends AbstractConnector {
})
}

private resolveSlave (client, callback: NodeCallback<ITcpConnectionOptions | null>): void {
private resolveSlave (client, callback: CallbackFunction<ITcpConnectionOptions | null>): void {
client.sentinel('slaves', this.options.name, (err, result) => {
client.disconnect()
if (err) {
Expand Down Expand Up @@ -214,7 +220,7 @@ export default class SentinelConnector extends AbstractConnector {
return this.options.natMap[`${item.host}:${item.port}`] || item
}

private resolve (endpoint, callback: NodeCallback<ITcpConnectionOptions>): void {
private resolve (endpoint, callback: CallbackFunction<ITcpConnectionOptions>): void {
var client = new Redis({
port: endpoint.port || 26379,
host: endpoint.host,
Expand Down
40 changes: 21 additions & 19 deletions lib/connectors/StandaloneConnector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {createConnection, TcpNetConnectOpts, IpcNetConnectOpts} from 'net'
import {connect as createTLSConnection, SecureContextOptions} from 'tls'
import {CONNECTION_CLOSED_ERROR_MSG} from '../utils'
import AbstractConnector, {ErrorEmitter} from './AbstractConnector'
import * as PromiseContainer from '../promiseContainer';
import {NetStream} from '../types'

export function isIIpcConnectionOptions (value: any): value is IIpcConnectionOptions {
Expand All @@ -21,7 +22,7 @@ export default class StandaloneConnector extends AbstractConnector {
super()
}

public connect (callback: Function, _: ErrorEmitter) {
public connect (_: ErrorEmitter) {
const {options} = this
this.connecting = true

Expand All @@ -46,27 +47,28 @@ export default class StandaloneConnector extends AbstractConnector {
if (options.tls) {
Object.assign(connectionOptions, options.tls)
}

const _Promise = PromiseContainer.get();
return new _Promise<NetStream>((resolve, reject) => {
process.nextTick(() => {
if (!this.connecting) {
reject(new Error(CONNECTION_CLOSED_ERROR_MSG))
return
}

process.nextTick(() => {
if (!this.connecting) {
callback(new Error(CONNECTION_CLOSED_ERROR_MSG))
return
}

let stream: NetStream
try {
if (options.tls) {
stream = createTLSConnection(connectionOptions)
} else {
stream = createConnection(connectionOptions)
try {
if (options.tls) {
this.stream = createTLSConnection(connectionOptions)
} else {
this.stream = createConnection(connectionOptions)
}
} catch (err) {
reject(err)
return
}
} catch (err) {
callback(err)
return
}

this.stream = stream
callback(null, stream)
resolve(this.stream)
})
})
}
}
19 changes: 14 additions & 5 deletions lib/index.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
exports = module.exports = require('./redis').default

export {ReplyError} from 'redis-errors'
export const Cluster = require('./cluster').default
export const Command = require('./command').default
export const ScanStream = require('./ScanStream').default
export const Pipeline = require('./pipeline').default
export {default} from './redis';
export {default as Cluster} from './cluster'
export {default as Command} from './command'
export {default as ScanStream} from './ScanStream'
export {default as Pipeline} from './pipeline'
export {default as AbstractConnector} from './connectors/AbstractConnector'
export {default as SentinelConnector, SentinelIterator} from './connectors/SentinelConnector'

// Type Exports
export {ISentinelAddress} from './connectors/SentinelConnector'
export {IRedisOptions} from './redis/RedisOptions'

// No TS typings
export const ReplyError = require('redis-errors').ReplyError

const PromiseContainer = require('./promiseContainer')
Object.defineProperty(exports, 'Promise', {
Expand Down
2 changes: 2 additions & 0 deletions lib/redis/RedisOptions.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import {ISentinelConnectionOptions} from '../connectors/SentinelConnector';
import AbstractConnector from '../connectors/AbstractConnector';
import {IClusterOptions} from '../cluster/ClusterOptions';
import {ICommanderOptions} from '../commander';

export type ReconnectOnError = (err: Error) => boolean | 1 | 2;

export interface IRedisOptions extends Partial<ISentinelConnectionOptions>, Partial<ICommanderOptions>, Partial<IClusterOptions> {
connector?: AbstractConnector,
retryStrategy?: (times: number) => number | void | null,
keepAlive?: number,
noDelay?: boolean,
Expand Down
2 changes: 1 addition & 1 deletion lib/redis/event_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ export function connectHandler(self) {
}
} else {
self.serverInfo = info;
if (self.connector.check(info)) {
if (self.options.connector.check(info)) {
exports.readyHandler(self)();
} else {
self.disconnect(true);
Expand Down
Loading