From 2f9c63179ecd89208c229938252d5c825ef9db59 Mon Sep 17 00:00:00 2001 From: luin Date: Sun, 14 Mar 2021 23:22:26 +0800 Subject: [PATCH] Handle instant stream error --- lib/connectors/AbstractConnector.ts | 1 + lib/connectors/SentinelConnector/index.ts | 5 +++++ lib/connectors/StandaloneConnector.ts | 5 +++++ lib/redis/index.ts | 16 ++++++++++++--- test/functional/connection.ts | 24 +++++++++++++++++++++++ 5 files changed, 48 insertions(+), 3 deletions(-) diff --git a/lib/connectors/AbstractConnector.ts b/lib/connectors/AbstractConnector.ts index 1d9b20221..276c33133 100644 --- a/lib/connectors/AbstractConnector.ts +++ b/lib/connectors/AbstractConnector.ts @@ -5,6 +5,7 @@ export type ErrorEmitter = (type: string, err: Error) => void; export default abstract class AbstractConnector { protected connecting = false; protected stream: NetStream; + public firstError?: Error; public check(info: any): boolean { return true; diff --git a/lib/connectors/SentinelConnector/index.ts b/lib/connectors/SentinelConnector/index.ts index bcde400d1..4349677d5 100644 --- a/lib/connectors/SentinelConnector/index.ts +++ b/lib/connectors/SentinelConnector/index.ts @@ -135,6 +135,11 @@ export default class SentinelConnector extends AbstractConnector { } else { this.stream = createConnection(resolved); } + + this.stream.once("error", (err) => { + this.firstError = err; + }); + this.sentinelIterator.reset(true); resolve(this.stream); } else { diff --git a/lib/connectors/StandaloneConnector.ts b/lib/connectors/StandaloneConnector.ts index 71a36fddd..1f022fab3 100644 --- a/lib/connectors/StandaloneConnector.ts +++ b/lib/connectors/StandaloneConnector.ts @@ -3,6 +3,7 @@ import { connect as createTLSConnection, SecureContextOptions } from "tls"; import { CONNECTION_CLOSED_ERROR_MSG } from "../utils"; import AbstractConnector, { ErrorEmitter } from "./AbstractConnector"; import { NetStream } from "../types"; +import { last } from "lodash"; export function isIIpcConnectionOptions( value: any @@ -76,6 +77,10 @@ export default class StandaloneConnector extends AbstractConnector { return; } + this.stream.once("error", (err) => { + this.firstError = err; + }); + resolve(this.stream); }); }); diff --git a/lib/redis/index.ts b/lib/redis/index.ts index 7c6b255e1..54adfa758 100644 --- a/lib/redis/index.ts +++ b/lib/redis/index.ts @@ -287,7 +287,7 @@ Redis.prototype.setStatus = function (status, arg) { */ Redis.prototype.connect = function (callback) { const _Promise = PromiseContainer.get(); - const promise = new _Promise((resolve, reject) => { + const promise = new _Promise((resolve, reject) => { if ( this.status === "connecting" || this.status === "connect" || @@ -370,11 +370,21 @@ Redis.prototype.connect = function (callback) { stream.setTimeout(0); }); } + } else if (stream.destroyed) { + const firstError = _this.connector.firstError; + if (firstError) { + process.nextTick(() => { + eventHandler.errorHandler(_this)(firstError); + }); + } + process.nextTick(eventHandler.closeHandler(_this)); } else { process.nextTick(eventHandler.connectHandler(_this)); } - stream.once("error", eventHandler.errorHandler(_this)); - stream.once("close", eventHandler.closeHandler(_this)); + if (!stream.destroyed) { + stream.once("error", eventHandler.errorHandler(_this)); + stream.once("close", eventHandler.closeHandler(_this)); + } if (options.noDelay) { stream.setNoDelay(true); diff --git a/test/functional/connection.ts b/test/functional/connection.ts index 90064b291..9f956727c 100644 --- a/test/functional/connection.ts +++ b/test/functional/connection.ts @@ -240,6 +240,30 @@ describe("connection", function () { done(); }); }); + + it("should close if socket destroyed before being returned", function (done) { + const message = "instant error"; + sinon.stub(net, "createConnection").callsFake(function () { + const socket = (net.createConnection as any).wrappedMethod.apply( + net, + arguments + ) as net.Socket; + socket.destroy(new Error(message)); + return socket; + }); + + let errored = false; + const redis = new Redis(); + redis.on("error", (err) => { + expect(err.message).to.equal(message); + errored = true; + }); + redis.on("close", () => { + expect(errored).to.equal(true); + redis.disconnect(); + done(); + }); + }); }); describe("retryStrategy", function () {