Skip to content

Commit

Permalink
Merge pull request ortoo#25 from ldm5180/pubsub-switchmaster
Browse files Browse the repository at this point in the history
Pub/Sub subscribe to +switch-master events.
  • Loading branch information
jamessharp committed Dec 9, 2014
2 parents daaca10 + 2acd9ac commit 1a493ad
Showing 1 changed file with 54 additions and 4 deletions.
58 changes: 54 additions & 4 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ function Sentinel(endpoints) {
}

this.endpoints = endpoints;
this.clients = [];
this.pubsub = [];
}

/**
Expand All @@ -19,7 +21,34 @@ function Sentinel(endpoints) {
* @return {RedisClient} the RedisClient for the desired endpoint
*/
Sentinel.prototype.createClient = function(masterName, opts) {
// When the client is ready create another client and subscribe to the
// switch-master event. Then any time there is a message on the channel it
// must be a master change, so reconnect all clients. This avoids combining
// the pub/sub client with the normal client and interfering with whatever
// the user is trying to do.
if (this.pubsub.length == 0) {
var self = this;
var pubsubOpts = {};
pubsubOpts.role = "sentinel";
pubsubClient = this.createClientInternal(masterName, pubsubOpts);
pubsubClient.subscribe("+switch-master", function(error) {
if (error) {
console.error("Unable to subscribe to Sentinel PUBSUB",
host, ":", port);
}
});
pubsubClient.on("message", function(channel, message) {
console.warn("Received +switch-master message from Redis Sentinel.",
" Reconnecting clients.");
self.reconnectAllClients();
});
pubsubClient.on("error", function(error) {});
self.pubsub.push(pubsubClient);
}
return this.createClientInternal(masterName, opts);
}

Sentinel.prototype.createClientInternal = function(masterName, opts) {
if (typeof masterName !== 'string') {
opts = masterName;
masterName = 'mymaster';
Expand All @@ -33,6 +62,7 @@ Sentinel.prototype.createClient = function(masterName, opts) {

var netClient = new net.Socket();
var client = new redis.RedisClient(netClient, opts);
this.clients.push(client);

var self = this;

Expand Down Expand Up @@ -67,11 +97,17 @@ Sentinel.prototype.createClient = function(masterName, opts) {
client.on('reconnecting', refreshEndpoints);

function refreshEndpoints() {
client.connectionOption.port = "";
client.connectionOption.host = "";
resolver(self.endpoints, masterName, function(_err, ip, port) {
if (_err) { oldEmit.call(client, 'error', _err); }
// Try and reconnect
client.connectionOption.port = port;
client.connectionOption.host = ip;
if (_err) {
oldEmit.call(client, 'error', _err);
} else {
// Try reconnecting.
client.connectionOption.port = port;
client.connectionOption.host = ip;
client.connection_gone("sentinel induced refresh");
}
});
}

Expand Down Expand Up @@ -112,6 +148,20 @@ Sentinel.prototype.createClient = function(masterName, opts) {
return client;
};


/*
* Ensure that all clients are trying to reconnect.
*/
Sentinel.prototype.reconnectAllClients = function() {
this.clients.forEach(function(client) {
// It is safe to call this multiple times in quick succession, as
// might happen with multiple Sentinel instances. Each client
// records its reconnect state and will only try to reconnect if
// not already doing so.
client.connection_gone("sentinel switch-master");
});
};

function resolveClient() {
var _i, __slice = [].slice;

Expand Down

0 comments on commit 1a493ad

Please sign in to comment.