Skip to content

Commit

Permalink
Fix unfulfilled commands being sent to the wrong db. Close #42
Browse files Browse the repository at this point in the history
  • Loading branch information
luin committed May 21, 2015
1 parent 89ead91 commit dd2ad38
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 45 deletions.
5 changes: 5 additions & 0 deletions lib/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,11 @@ Pipeline.prototype.exec = function (callback) {
} else {
_this.redis.stream.write(data);
}

// Reset writePending for resending
writePending = _this._queue.length;
data = '';
bufferMode = false;
}
}
};
Expand Down
11 changes: 7 additions & 4 deletions lib/redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -319,10 +319,9 @@ Redis.prototype.flushQueue = function (error) {
item.command.reject(error);
}

var command;
while (this.commandQueue.length > 0) {
command = this.commandQueue.shift();
command.reject(error);
item = this.commandQueue.shift();
item.command.reject(error);
}
};

Expand Down Expand Up @@ -485,7 +484,11 @@ Redis.prototype.sendCommand = function (command, stream) {
debug('write command[%d] -> %s(%s)', this.condition.select, command.name, command.args);
(stream || this.stream).write(command.toWritable());

this.commandQueue.push(command);
this.commandQueue.push({
command: command,
stream: stream,
select: this.condition.select
});

if (_.includes(Command.FLAGS.WILL_DISCONNECT, command.name)) {
this.manuallyClosing = true;
Expand Down
18 changes: 12 additions & 6 deletions lib/redis/event_handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -99,32 +99,38 @@ exports.readyHandler = function (self) {
self.setStatus('ready');
self.retryAttempts = 0;

var item;
var finalSelect = self.condition.select;
self.condition.select = 0;

if (self.prevCommandQueue) {
if (self.options.autoResendUnfulfilledCommands) {
debug('resend %d unfulfilled commands', self.prevCommandQueue.length);
while (self.prevCommandQueue.length) {
var command = self.prevCommandQueue.shift();
self.sendCommand(command);
while (self.prevCommandQueue.length > 0) {
item = self.prevCommandQueue.shift();
if (item.select !== self.condition.select && item.command.name !== 'select') {
self.select(item.select);
}
self.sendCommand(item.command, item.stream);
}
} else {
self.prevCommandQueue = null;
}
}

var finalSelect = self.condition.select;
self.condition.select = 0;
if (self.offlineQueue.length) {
debug('send %d commands in offline queue', self.offlineQueue.length);
var offlineQueue = self.offlineQueue;
self.offlineQueue = [];
while (offlineQueue.length > 0) {
var item = offlineQueue.shift();
item = offlineQueue.shift();
if (item.select !== self.condition.select && item.command.name !== 'select') {
self.select(item.select);
}
self.sendCommand(item.command, item.stream);
}
}

if (self.condition.select !== finalSelect) {
debug('connect to db [%d]', finalSelect);
self.selectBuffer(finalSelect);
Expand Down
45 changes: 20 additions & 25 deletions lib/redis/prototype/parser.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ exports.initParser = function () {
};

exports.returnError = function (err) {
var command = this.commandQueue.shift();
var command = this.commandQueue.shift().command;

err.command = {
name: command.name,
Expand Down Expand Up @@ -68,7 +68,7 @@ exports.returnReply = function (reply) {
}
}

var command, channel, count;
var item, channel, count;
if (this.condition.mode.subscriber) {
var replyType = Array.isArray(reply) ? reply[0].toString() : null;
debug('receive reply "%s" in subscriber mode', replyType);
Expand All @@ -95,9 +95,9 @@ exports.returnReply = function (reply) {
case 'psubscribe':
channel = reply[1].toString();
this.condition.mode.subscriber.add(replyType, channel);
command = shiftCommand(this);
if (!fillSubCommand(command, reply[2])) {
this.commandQueue.unshift(command);
item = this.commandQueue.shift();
if (!fillSubCommand(item.command, reply[2])) {
this.commandQueue.unshift(item);
}
break;
case 'unsubscribe':
Expand All @@ -110,41 +110,36 @@ exports.returnReply = function (reply) {
if (count === 0) {
this.condition.mode.subscriber = false;
}
command = shiftCommand(this);
if (!fillUnsubCommand(command, count)) {
this.commandQueue.unshift(command);
item = this.commandQueue.shift();
if (!fillUnsubCommand(item.command, count)) {
this.commandQueue.unshift(item);
}
break;
default:
command = shiftCommand(this);
command.resolve(reply);
item = this.commandQueue.shift();
item.command.resolve(reply);
}
} else {
command = shiftCommand(this);
if (!command) {
item = this.commandQueue.shift();
if (!item) {
return this.emit('error', new Error('Command queue state error. If you can reproduce this, please report it.'));
}
if (_.includes(Command.FLAGS.ENTER_SUBSCRIBER_MODE, command.name)) {
if (_.includes(Command.FLAGS.ENTER_SUBSCRIBER_MODE, item.command.name)) {
this.condition.mode.subscriber = new SubscriptionSet();
this.condition.mode.subscriber.add(command.name, reply[1].toString());
this.condition.mode.subscriber.add(item.command.name, reply[1].toString());

if (!fillSubCommand(command, reply[2])) {
this.commandQueue.unshift(command);
if (!fillSubCommand(item.command, reply[2])) {
this.commandQueue.unshift(item);
}
} else if (_.includes(Command.FLAGS.EXIT_SUBSCRIBER_MODE, command.name)) {
if (!fillUnsubCommand(command, reply[2])) {
this.commandQueue.unshift(command);
} else if (_.includes(Command.FLAGS.EXIT_SUBSCRIBER_MODE, item.command.name)) {
if (!fillUnsubCommand(item.command, reply[2])) {
this.commandQueue.unshift(item);
}
} else {
command.resolve(reply);
item.command.resolve(reply);
}
}

function shiftCommand(self) {
var command = self.commandQueue.shift();
return command;
}

function fillSubCommand(command, count) {
if (typeof command.remainReplies === 'undefined') {
command.remainReplies = command.args.length;
Expand Down
27 changes: 17 additions & 10 deletions test/functional/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -140,21 +140,28 @@ describe('connection', function () {
});

describe('autoResendUnfulfilledCommands', function () {
it('should resend unfulfilled when reconnected', function (done) {
var redis = new Redis();
var pub = new Redis();
it('should resend unfulfilled commands to the correct db when reconnected', function (done) {
var redis = new Redis({ db: 3 });
var pub = new Redis({ db: 3 });
redis.once('ready', function () {
var write = redis.stream.write;
redis.stream.write = function () {
write.apply(redis.stream, arguments);
redis.stream.write = write;
redis.stream.end();
};
var pending = 2;
redis.blpop('l', 0, function (err, res) {
expect(res[0]).to.eql('l');
expect(res[1]).to.eql('1');
done();
if (!--pending) {
done();
}
});
redis.set('foo', '1');
redis.pipeline().incr('foo').exec(function (err, res) {
expect(res[0][1]).to.eql(2);
if (!--pending) {
done();
}
});
setTimeout(function () {
redis.stream.end();
}, 0);
});
redis.once('close', function () {
pub.lpush('l', 1);
Expand Down

0 comments on commit dd2ad38

Please sign in to comment.