Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster lazy connect #352

Merged
merged 5 commits into from
Jul 28, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/cluster/connection_pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ ConnectionPool.prototype.findOrCreate = function (node, readOnly) {
redis = this.nodes.all[node.key];
if (redis.options.readOnly !== readOnly) {
redis.options.readOnly = readOnly;
redis[readOnly ? 'readonly' : 'readwrite']().catch(function () {});
redis[readOnly ? 'readonly' : 'readwrite']().catch(_.noop);
if (readOnly) {
delete this.nodes.master[node.key];
this.nodes.slave[node.key] = redis;
Expand Down
77 changes: 53 additions & 24 deletions lib/cluster/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,11 @@ function Cluster(startupNodes, options) {

this.subscriber = null;

this.connect().catch(noop);
if (this.options.lazyConnect) {
this.setStatus('wait');
} else {
this.connect().catch(_.noop);
}
}

/**
Expand Down Expand Up @@ -161,24 +165,7 @@ Cluster.prototype.connect = function () {

this.once('refresh', refreshListener);
this.once('close', closeListener);

this.once('close', function () {
var retryDelay;
if (!this.manuallyClosing && typeof this.options.clusterRetryStrategy === 'function') {
retryDelay = this.options.clusterRetryStrategy.call(this, ++this.retryAttempts);
}
if (typeof retryDelay === 'number') {
this.setStatus('reconnecting');
this.reconnectTimeout = setTimeout(function () {
this.reconnectTimeout = null;
debug('Cluster is disconnected. Retrying after %dms', retryDelay);
this.connect().catch(noop);
}.bind(this), retryDelay);
} else {
this.setStatus('end');
this.flushQueue(new Error('None of startup nodes is available'));
}
});
this.once('close', this._handleCloseEvent.bind(this));

this.refreshSlotsCache(function (err) {
if (err && err.message === 'Failed to refresh slots cache.') {
Expand All @@ -190,12 +177,34 @@ Cluster.prototype.connect = function () {
}.bind(this));
};

/**
* Called when closed to check whether a reconnection should be made
*/
Cluster.prototype._handleCloseEvent = function () {
var retryDelay;
if (!this.manuallyClosing && typeof this.options.clusterRetryStrategy === 'function') {
retryDelay = this.options.clusterRetryStrategy.call(this, ++this.retryAttempts);
}
if (typeof retryDelay === 'number') {
this.setStatus('reconnecting');
this.reconnectTimeout = setTimeout(function () {
this.reconnectTimeout = null;
debug('Cluster is disconnected. Retrying after %dms', retryDelay);
this.connect().catch(_.noop);
}.bind(this), retryDelay);
} else {
this.setStatus('end');
this.flushQueue(new Error('None of startup nodes is available'));
}
};

/**
* Disconnect from every node in the cluster.
*
* @public
*/
Cluster.prototype.disconnect = function (reconnect) {
var status = this.status;
this.setStatus('disconnecting');

if (!reconnect) {
Expand All @@ -205,7 +214,13 @@ Cluster.prototype.disconnect = function (reconnect) {
clearTimeout(this.reconnectTimeout);
this.reconnectTimeout = null;
}
this.connectionPool.reset([]);

if (status === 'wait') {
this.setStatus('close');
this._handleCloseEvent();
} else {
this.connectionPool.reset([]);
}
};

/**
Expand All @@ -216,6 +231,7 @@ Cluster.prototype.disconnect = function (reconnect) {
* @public
*/
Cluster.prototype.quit = function (callback) {
var status = this.status;
this.setStatus('disconnecting');

this.manuallyClosing = true;
Expand All @@ -224,6 +240,18 @@ Cluster.prototype.quit = function (callback) {
clearTimeout(this.reconnectTimeout);
this.reconnectTimeout = null;
}
if (status === 'wait') {
var ret = Promise.resolve('OK').nodeify(callback);

// use setImmediate to make sure "close" event
// being emitted after quit() is returned
setImmediate(function () {
this.setStatus('close');
this._handleCloseEvent();
}.bind(this));

return ret;
}
return Promise.all(this.nodes().map(function (node) {
return node.quit();
})).then(function () {
Expand Down Expand Up @@ -277,12 +305,12 @@ Cluster.prototype.selectSubscriber = function () {
if (!--pending) {
_this.lastActiveSubscriber = _this.subscriber;
}
}).catch(noop);
}).catch(_.noop);
}
});
} else {
if (this.subscriber.status === 'wait') {
this.subscriber.connect().catch(noop);
this.subscriber.connect().catch(_.noop);
}
this.lastActiveSubscriber = this.subscriber;
}
Expand Down Expand Up @@ -389,6 +417,9 @@ Cluster.prototype.executeOfflineCommands = function () {
};

Cluster.prototype.sendCommand = function (command, stream, node) {
if (this.status === 'wait') {
this.connect().catch(_.noop);
}
if (this.status === 'end') {
command.reject(new Error(utils.CONNECTION_CLOSED_ERROR_MSG));
return command.promise;
Expand Down Expand Up @@ -627,6 +658,4 @@ Cluster.prototype._readyCheck = function (callback) {

require('../transaction').addTransactionSupport(Cluster.prototype);

function noop() {}

module.exports = Cluster;
4 changes: 2 additions & 2 deletions lib/redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ function Redis() {
if (this.options.lazyConnect) {
this.setStatus('wait');
} else {
this.connect().catch(function () {});
this.connect().catch(_.noop);
}
}

Expand Down Expand Up @@ -544,7 +544,7 @@ require('./transaction').addTransactionSupport(Redis.prototype);
*/
Redis.prototype.sendCommand = function (command, stream) {
if (this.status === 'wait') {
this.connect().catch(function () {});
this.connect().catch(_.noop);
}
if (this.status === 'end') {
command.reject(new Error(utils.CONNECTION_CLOSED_ERROR_MSG));
Expand Down
5 changes: 3 additions & 2 deletions lib/redis/event_handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
var debug = require('debug')('ioredis:connection');
var Command = require('../command');
var utils = require('../utils');
var _ = require('lodash');

exports.connectHandler = function (self) {
return function () {
Expand Down Expand Up @@ -91,7 +92,7 @@ exports.closeHandler = function (self) {
self.setStatus('reconnecting', retryDelay);
self.reconnectTimeout = setTimeout(function () {
self.reconnectTimeout = null;
self.connect().catch(function () {});
self.connect().catch(_.noop);
}, retryDelay);
};

Expand Down Expand Up @@ -145,7 +146,7 @@ exports.readyHandler = function (self) {

if (self.options.readOnly) {
debug('set the connection to readonly mode');
self.readonly().catch(function () {});
self.readonly().catch(_.noop);
}

if (self.prevCondition) {
Expand Down
51 changes: 51 additions & 0 deletions test/functional/lazy_connect.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,55 @@ describe('lazy connect', function () {
});
redis.disconnect();
});

describe('Cluster', function () {
it('should not call `connect` when init', function () {
stub(Redis.Cluster.prototype, 'connect').throws(new Error('`connect` should not be called'));
new Redis.Cluster([], { lazyConnect: true });
Redis.Cluster.prototype.connect.restore();
});

it('should quit before "close" being emited', function (done) {
stub(Redis.Cluster.prototype, 'connect').throws(new Error('`connect` should not be called'));
var cluster = new Redis.Cluster([], { lazyConnect: true });
cluster.quit(function () {
cluster.once('close', function () {
cluster.once('end', function () {
Redis.Cluster.prototype.connect.restore();
done();
});
});
});
});

it('should disconnect before "close" being emited', function (done) {
stub(Redis.Cluster.prototype, 'connect').throws(new Error('`connect` should not be called'));
var cluster = new Redis.Cluster([], { lazyConnect: true });
cluster.disconnect();
cluster.once('close', function () {
cluster.once('end', function () {
Redis.Cluster.prototype.connect.restore();
done();
});
});
});

it('should support disconnecting with reconnect', function (done) {
stub(Redis.Cluster.prototype, 'connect').throws(new Error('`connect` should not be called'));
var cluster = new Redis.Cluster([], {
lazyConnect: true,
clusterRetryStrategy: function () {
return 1;
}
});
cluster.disconnect(true);
cluster.once('close', function () {
Redis.Cluster.prototype.connect.restore();
stub(Redis.Cluster.prototype, 'connect', function () {
Redis.Cluster.prototype.connect.restore();
done();
});
});
});
});
});