Skip to content

Commit

Permalink
Handle instant stream error
Browse files Browse the repository at this point in the history
  • Loading branch information
luin committed Mar 14, 2021
1 parent 8524eea commit 2f9c631
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 3 deletions.
1 change: 1 addition & 0 deletions lib/connectors/AbstractConnector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export type ErrorEmitter = (type: string, err: Error) => void;
export default abstract class AbstractConnector {
protected connecting = false;
protected stream: NetStream;
public firstError?: Error;

public check(info: any): boolean {
return true;
Expand Down
5 changes: 5 additions & 0 deletions lib/connectors/SentinelConnector/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ export default class SentinelConnector extends AbstractConnector {
} else {
this.stream = createConnection(resolved);
}

this.stream.once("error", (err) => {
this.firstError = err;
});

this.sentinelIterator.reset(true);
resolve(this.stream);
} else {
Expand Down
5 changes: 5 additions & 0 deletions lib/connectors/StandaloneConnector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { connect as createTLSConnection, SecureContextOptions } from "tls";
import { CONNECTION_CLOSED_ERROR_MSG } from "../utils";
import AbstractConnector, { ErrorEmitter } from "./AbstractConnector";
import { NetStream } from "../types";
import { last } from "lodash";

export function isIIpcConnectionOptions(
value: any
Expand Down Expand Up @@ -76,6 +77,10 @@ export default class StandaloneConnector extends AbstractConnector {
return;
}

this.stream.once("error", (err) => {
this.firstError = err;
});

resolve(this.stream);
});
});
Expand Down
16 changes: 13 additions & 3 deletions lib/redis/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ Redis.prototype.setStatus = function (status, arg) {
*/
Redis.prototype.connect = function (callback) {
const _Promise = PromiseContainer.get();
const promise = new _Promise((resolve, reject) => {
const promise = new _Promise<void>((resolve, reject) => {
if (
this.status === "connecting" ||
this.status === "connect" ||
Expand Down Expand Up @@ -370,11 +370,21 @@ Redis.prototype.connect = function (callback) {
stream.setTimeout(0);
});
}
} else if (stream.destroyed) {
const firstError = _this.connector.firstError;
if (firstError) {
process.nextTick(() => {
eventHandler.errorHandler(_this)(firstError);
});
}
process.nextTick(eventHandler.closeHandler(_this));
} else {
process.nextTick(eventHandler.connectHandler(_this));
}
stream.once("error", eventHandler.errorHandler(_this));
stream.once("close", eventHandler.closeHandler(_this));
if (!stream.destroyed) {
stream.once("error", eventHandler.errorHandler(_this));
stream.once("close", eventHandler.closeHandler(_this));
}

if (options.noDelay) {
stream.setNoDelay(true);
Expand Down
24 changes: 24 additions & 0 deletions test/functional/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,30 @@ describe("connection", function () {
done();
});
});

it("should close if socket destroyed before being returned", function (done) {
const message = "instant error";
sinon.stub(net, "createConnection").callsFake(function () {
const socket = (net.createConnection as any).wrappedMethod.apply(
net,
arguments
) as net.Socket;
socket.destroy(new Error(message));
return socket;
});

let errored = false;
const redis = new Redis();
redis.on("error", (err) => {
expect(err.message).to.equal(message);
errored = true;
});
redis.on("close", () => {
expect(errored).to.equal(true);
redis.disconnect();
done();
});
});
});

describe("retryStrategy", function () {
Expand Down

0 comments on commit 2f9c631

Please sign in to comment.