Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Elasticache: Abort incomplete pipelines and transactions upon reconnect #1084

Merged
merged 1 commit into from
Mar 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/basic_operations.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ redis.sadd("set", [1, 3, 5, 7]);
redis.spop("set"); // Promise resolves to "5" or another item in the set

// Most responses are strings, or arrays of strings
redis.zadd("sortedSet", 1, "one", 2, "dos", 4, "quatro", 3, "three")
redis.zadd("sortedSet", 1, "one", 2, "dos", 4, "quatro", 3, "three");
redis.zrange("sortedSet", 0, 2, "WITHSCORES").then(res => console.log(res)); // Promise resolves to ["one", "1", "dos", "2", "three", "3"] as if the command was ` redis> ZRANGE sortedSet 0 2 WITHSCORES `

// Some responses have transformers to JS values
Expand Down
2 changes: 2 additions & 0 deletions lib/command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ export default class Command implements ICommand {
private callback: CallbackFunction;
private transformed: boolean = false;
public isCustomCommand: boolean = false;
public inTransaction: boolean = false;
public pipelineIndex?: number;

private slot?: number | null;
private keys?: Array<string | Buffer>;
Expand Down
21 changes: 8 additions & 13 deletions lib/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,9 @@ Pipeline.prototype.fillResult = function(value, position) {
if (this.isCluster) {
let retriable = true;
let commonError: { name: string; message: string };
let inTransaction: boolean;
for (let i = 0; i < this._result.length; ++i) {
var error = this._result[i][0];
var command = this._queue[i];
if (command.name === "multi") {
inTransaction = true;
} else if (command.name === "exec") {
inTransaction = false;
}
if (error) {
if (
command.name === "exec" &&
Expand All @@ -94,7 +88,7 @@ Pipeline.prototype.fillResult = function(value, position) {
retriable = false;
break;
}
} else if (!inTransaction) {
} else if (!command.inTransaction) {
var isReadOnly =
exists(command.name) && hasFlag(command.name, "readonly");
if (!isReadOnly) {
Expand All @@ -107,7 +101,7 @@ Pipeline.prototype.fillResult = function(value, position) {
var _this = this;
var errv = commonError.message.split(" ");
var queue = this._queue;
inTransaction = false;
let inTransaction = false;
this._queue = [];
for (let i = 0; i < queue.length; ++i) {
if (
Expand All @@ -122,11 +116,7 @@ Pipeline.prototype.fillResult = function(value, position) {
}
queue[i].initPromise();
this.sendCommand(queue[i]);
if (queue[i].name === "multi") {
inTransaction = true;
} else if (queue[i].name === "exec") {
inTransaction = false;
}
inTransaction = queue[i].inTransaction;
}

let matched = true;
Expand Down Expand Up @@ -174,7 +164,12 @@ Pipeline.prototype.fillResult = function(value, position) {
};

Pipeline.prototype.sendCommand = function(command) {
if (this._transactions > 0) {
command.inTransaction = true;
}

const position = this._queue.length;
command.pipelineIndex = position;

command.promise
.then(result => {
Expand Down
62 changes: 62 additions & 0 deletions lib/redis/event_handler.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
"use strict";

import Deque = require("denque");
import { AbortError } from "redis-errors";
import Command from "../command";
import { MaxRetriesPerRequestError } from "../errors";
import { ICommandItem, ICommand } from "../types";
import { Debug, noop, CONNECTION_CLOSED_ERROR_MSG } from "../utils";
import DataHandler from "../DataHandler";

Expand Down Expand Up @@ -77,6 +80,61 @@ export function connectHandler(self) {
};
}

function abortError(command: ICommand) {
const err = new AbortError("Command aborted due to connection close");
(err as any).command = {
name: command.name,
args: command.args
};
return err;
}

// If a contiguous set of pipeline commands starts from index zero then they
// can be safely reattempted. If however we have a chain of pipelined commands
// starting at index 1 or more it means we received a partial response before
// the connection close and those pipelined commands must be aborted. For
// example, if the queue looks like this: [2, 3, 4, 0, 1, 2] then after
// aborting and purging we'll have a queue that looks like this: [0, 1, 2]
function abortIncompletePipelines(commandQueue: Deque<ICommandItem>) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do feel like this approach is treating the downstream symptoms rather than the root cause of the problem. The tradeoff is that it isolates connection handling cleanup stuff inside event_handler so that transaction.ts and pipeline.ts can be more decoupled from reconnection behaviors.

I'm open to other ways of approaching this.

let expectedIndex = 0;
for (let i = 0; i < commandQueue.length; ) {
const command = commandQueue.peekAt(i).command as Command;
const pipelineIndex = command.pipelineIndex;
if (pipelineIndex === undefined || pipelineIndex === 0) {
expectedIndex = 0;
}
if (pipelineIndex !== undefined && pipelineIndex !== expectedIndex++) {
commandQueue.remove(i, 1);
command.reject(abortError(command));
continue;
}
i++;
}
}

// If only a partial transaction result was received before connection close,
// we have to abort any transaction fragments that may have ended up in the
// offline queue
function abortTransactionFragments(commandQueue: Deque<ICommandItem>) {
for (let i = 0; i < commandQueue.length; ) {
const command = commandQueue.peekAt(i).command as Command;
if (command.name === "multi") {
break;
}
if (command.name === "exec") {
commandQueue.remove(i, 1);
command.reject(abortError(command));
break;
}
if ((command as Command).inTransaction) {
commandQueue.remove(i, 1);
command.reject(abortError(command));
} else {
i++;
}
}
}

export function closeHandler(self) {
return function() {
self.setStatus("close");
Expand All @@ -85,8 +143,12 @@ export function closeHandler(self) {
self.prevCondition = self.condition;
}
if (self.commandQueue.length) {
abortIncompletePipelines(self.commandQueue);
self.prevCommandQueue = self.commandQueue;
}
if (self.offlineQueue.length) {
abortTransactionFragments(self.offlineQueue);
}

if (self.manuallyClosing) {
self.manuallyClosing = false;
Expand Down
153 changes: 153 additions & 0 deletions test/functional/elasticache.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
import Redis from "../../lib/redis";
import { expect } from "chai";
import MockServer from "../helpers/mock_server";

// AWS Elasticache closes the connection immediately when it encounters a READONLY error
function simulateElasticache(options: {
reconnectOnErrorValue: boolean | number;
}) {
let inTransaction = false;
const mockServer = new MockServer(30000, (argv, socket, flags) => {
switch (argv[0]) {
case "multi":
inTransaction = true;
return MockServer.raw("+OK\r\n");
case "del":
flags.disconnect = true;
return new Error(
"READONLY You can't write against a read only replica."
);
case "get":
return inTransaction ? MockServer.raw("+QUEUED\r\n") : argv[1];
case "exec":
inTransaction = false;
return [];
}
});

return new Redis({
port: 30000,
reconnectOnError(err: Error): boolean | number {
// bring the mock server back up
mockServer.connect();
return options.reconnectOnErrorValue;
}
});
}

function expectReplyError(err) {
expect(err).to.exist;
expect(err.name).to.eql("ReplyError");
}

function expectAbortError(err) {
expect(err).to.exist;
expect(err.name).to.eql("AbortError");
expect(err.message).to.eql("Command aborted due to connection close");
}

describe("elasticache", function() {
it("should abort a failed transaction when connection is lost", function(done) {
const redis = simulateElasticache({ reconnectOnErrorValue: true });

redis
.multi()
.del("foo")
.del("bar")
.exec(err => {
expectAbortError(err);
expect(err.command).to.eql({
name: "exec",
args: []
});
expect(err.previousErrors).to.have.lengthOf(2);
expectReplyError(err.previousErrors[0]);
expect(err.previousErrors[0].command).to.eql({
name: "del",
args: ["foo"]
});
expectAbortError(err.previousErrors[1]);
expect(err.previousErrors[1].command).to.eql({
name: "del",
args: ["bar"]
});

// ensure we've recovered into a healthy state
redis.get("foo", (err, res) => {
expect(res).to.eql("foo");
done();
});
});
});

it("should not resend failed transaction commands", function(done) {
const redis = simulateElasticache({ reconnectOnErrorValue: 2 });
redis
.multi()
.del("foo")
.get("bar")
.exec(err => {
expectAbortError(err);
expect(err.command).to.eql({
name: "exec",
args: []
});
expect(err.previousErrors).to.have.lengthOf(2);
expectAbortError(err.previousErrors[0]);
expect(err.previousErrors[0].command).to.eql({
name: "del",
args: ["foo"]
});
expectAbortError(err.previousErrors[1]);
expect(err.previousErrors[1].command).to.eql({
name: "get",
args: ["bar"]
});

// ensure we've recovered into a healthy state
redis.get("foo", (err, res) => {
expect(res).to.eql("foo");
done();
});
});
});

it("should resend intact pipelines", function(done) {
const redis = simulateElasticache({ reconnectOnErrorValue: true });

let p1Result;
redis
.pipeline()
.del("foo")
.get("bar")
.exec((err, result) => (p1Result = result));

redis
.pipeline()
.get("baz")
.get("qux")
.exec((err, p2Result) => {
// First pipeline should have been aborted
expect(p1Result).to.have.lengthOf(2);
expect(p1Result[0]).to.have.lengthOf(1);
expect(p1Result[1]).to.have.lengthOf(1);
expectReplyError(p1Result[0][0]);
expect(p1Result[0][0].command).to.eql({
name: "del",
args: ["foo"]
});
expectAbortError(p1Result[1][0]);
expect(p1Result[1][0].command).to.eql({
name: "get",
args: ["bar"]
});

// Second pipeline was intact and should have been retried successfully
expect(p2Result).to.have.lengthOf(2);
expect(p2Result[0]).to.eql([null, "baz"]);
expect(p2Result[1]).to.eql([null, "qux"]);

done();
});
});
});
35 changes: 35 additions & 0 deletions test/functional/reconnect_on_error.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import Redis from "../../lib/redis";
import { expect } from "chai";
import * as sinon from "sinon";

describe("reconnectOnError", function() {
it("should pass the error as the first param", function(done) {
Expand Down Expand Up @@ -109,4 +110,38 @@ describe("reconnectOnError", function() {
done();
});
});

it("should work with pipelined multi", function(done) {
var redis = new Redis({
reconnectOnError: function() {
// deleting foo allows sadd below to succeed on the second try
redis.del("foo");
return 2;
}
});
var delSpy = sinon.spy(redis, "del");

redis.set("foo", "bar");
redis.set("i", 1);
redis
.pipeline()
.sadd("foo", "a") // trigger a WRONGTYPE error
.multi()
.get("foo")
.incr("i")
.exec()
.exec(function(err, res) {
expect(delSpy.calledOnce).to.eql(true);
expect(delSpy.firstCall.args[0]).to.eql("foo");
expect(err).to.be.null;
expect(res).to.eql([
[null, 1],
[null, "OK"],
[null, "QUEUED"],
[null, "QUEUED"],
[null, ["bar", 2]]
]);
done();
});
});
});
16 changes: 14 additions & 2 deletions test/helpers/mock_server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,14 @@ export function getConnectionName(socket: Socket): string | undefined {
return connectionNameMap.get(socket);
}

export type MockServerHandler = (reply: any, socket: Socket) => any;
interface IFlags {
disconnect?: boolean;
}
export type MockServerHandler = (
reply: any,
socket: Socket,
flags: IFlags
) => any;

export default class MockServer extends EventEmitter {
static REDIS_OK = "+OK";
Expand Down Expand Up @@ -84,7 +91,12 @@ export default class MockServer extends EventEmitter {
this.write(c, this.slotTable);
return;
}
this.write(c, this.handler && this.handler(reply, c));
let flags: Flags = {};
let handlerResult = this.handler && this.handler(reply, c, flags);
this.write(c, handlerResult);
if (flags.disconnect) {
this.disconnect();
}
},
returnError: function() {}
});
Expand Down
Loading