Skip to content

Commit

Permalink
chore: internal typing improvements (#1708)
Browse files Browse the repository at this point in the history
  • Loading branch information
luin authored Jan 25, 2023
1 parent a22fd2d commit 96a9f41
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 65 deletions.
19 changes: 14 additions & 5 deletions lib/DataHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,29 @@ const debug = Debug("dataHandler");

type ReplyData = string | Buffer | number | Array<string | Buffer | number>;

interface Condition {
export interface Condition {
select: number;
auth: string;
auth?: string | [string, string];
subscriber: false | SubscriptionSet;
}

interface DataHandledable extends EventEmitter {
export type FlushQueueOptions = {
offlineQueue?: boolean;
commandQueue?: boolean;
};

export interface DataHandledable extends EventEmitter {
stream: NetStream;
status: string;
condition: Condition;
condition: Condition | null;
commandQueue: Deque<CommandItem>;

disconnect(reconnect: boolean): void;
recoverFromFatalError(commandError: Error, err: Error, options: any): void;
recoverFromFatalError(
commandError: Error,
err: Error,
options: FlushQueueOptions
): void;
handleReconnection(err: Error, item: CommandItem): void;
}

Expand Down
113 changes: 65 additions & 48 deletions lib/Redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { EventEmitter } from "events";
import asCallback from "standard-as-callback";
import Cluster from "./cluster";
import Command from "./Command";
import { DataHandledable, FlushQueueOptions, Condition } from "./DataHandler";
import { StandaloneConnector } from "./connectors";
import AbstractConnector from "./connectors/AbstractConnector";
import SentinelConnector from "./connectors/SentinelConnector";
Expand Down Expand Up @@ -60,7 +61,7 @@ type RedisStatus =
* }
* ```
*/
class Redis extends Commander {
class Redis extends Commander implements DataHandledable {
static Cluster = Cluster;
static Command = Command;
/**
Expand Down Expand Up @@ -89,14 +90,18 @@ class Redis extends Commander {
*/
isCluster = false;

/**
* @ignore
*/
condition: Condition | null;

/**
* @ignore
*/
commandQueue: Deque<CommandItem>;

private connector: AbstractConnector;
private reconnectTimeout: ReturnType<typeof setTimeout> | null = null;
private condition: {
select: number;
auth?: string | [string, string];
subscriber: boolean;
};
private commandQueue: Deque<CommandItem>;
private offlineQueue: Deque;
private connectionEpoch = 0;
private retryAttempts = 0;
Expand Down Expand Up @@ -220,9 +225,11 @@ class Redis extends Commander {

// Node ignores setKeepAlive before connect, therefore we wait for the event:
// https://github.com/nodejs/node/issues/31663
if (typeof options.keepAlive === 'number') {
if (typeof options.keepAlive === "number") {
if (stream.connecting) {
stream.once(CONNECT_EVENT, () => stream.setKeepAlive(true, options.keepAlive));
stream.once(CONNECT_EVENT, () => {
stream.setKeepAlive(true, options.keepAlive);
});
} else {
stream.setKeepAlive(true, options.keepAlive);
}
Expand Down Expand Up @@ -344,10 +351,10 @@ class Redis extends Commander {
* One of `"normal"`, `"subscriber"`, or `"monitor"`. When the connection is
* not in `"normal"` mode, certain commands are not allowed.
*/
get mode(): "normal" | "subscriber" | "monitor" {
get mode(): "normal" | "subscriber" | "monitor" {
return this.options.monitor
? "monitor"
: this.condition && this.condition.subscriber
: this.condition?.subscriber
? "subscriber"
: "normal";
}
Expand Down Expand Up @@ -421,7 +428,7 @@ class Redis extends Commander {
return command.promise;
}
if (
this.condition.subscriber &&
this.condition?.subscriber &&
!Command.checkFlag("VALID_IN_SUBSCRIBER_MODE", command.name)
) {
command.reject(
Expand Down Expand Up @@ -491,7 +498,7 @@ class Redis extends Commander {
debug(
"write command[%s]: %d -> %s(%o)",
this._getDescription(),
this.condition.select,
this.condition?.select,
command.name,
command.args
);
Expand Down Expand Up @@ -600,45 +607,22 @@ class Redis extends Commander {
}

/**
* Get description of the connection. Used for debugging.
* @ignore
*/
private _getDescription() {
let description;
if ("path" in this.options && this.options.path) {
description = this.options.path;
} else if (
this.stream &&
this.stream.remoteAddress &&
this.stream.remotePort
) {
description = this.stream.remoteAddress + ":" + this.stream.remotePort;
} else if ("host" in this.options && this.options.host) {
description = this.options.host + ":" + this.options.port;
} else {
// Unexpected
description = "";
}
if (this.options.connectionName) {
description += ` (${this.options.connectionName})`;
}
return description;
}

private resetCommandQueue() {
this.commandQueue = new Deque();
}

private resetOfflineQueue() {
this.offlineQueue = new Deque();
}

private recoverFromFatalError(commandError, err: Error | null, options) {
recoverFromFatalError(
_commandError: Error,
err: Error,
options: FlushQueueOptions
) {
this.flushQueue(err, options);
this.silentEmit("error", err);
this.disconnect(true);
}

private handleReconnection(err: Error, item: CommandItem) {
/**
* @ignore
*/
handleReconnection(err: Error, item: CommandItem) {
let needReconnect: ReturnType<ReconnectOnError> = false;
if (this.options.reconnectOnError) {
needReconnect = this.options.reconnectOnError(err);
Expand All @@ -657,7 +641,7 @@ class Redis extends Commander {
this.disconnect(true);
}
if (
this.condition.select !== item.select &&
this.condition?.select !== item.select &&
item.command.name !== "select"
) {
this.select(item.select);
Expand All @@ -671,6 +655,39 @@ class Redis extends Commander {
}
}

/**
* Get description of the connection. Used for debugging.
*/
private _getDescription() {
let description;
if ("path" in this.options && this.options.path) {
description = this.options.path;
} else if (
this.stream &&
this.stream.remoteAddress &&
this.stream.remotePort
) {
description = this.stream.remoteAddress + ":" + this.stream.remotePort;
} else if ("host" in this.options && this.options.host) {
description = this.options.host + ":" + this.options.port;
} else {
// Unexpected
description = "";
}
if (this.options.connectionName) {
description += ` (${this.options.connectionName})`;
}
return description;
}

private resetCommandQueue() {
this.commandQueue = new Deque();
}

private resetOfflineQueue() {
this.offlineQueue = new Deque();
}

private parseOptions(...args: unknown[]) {
const options: Record<string, unknown> = {};
let isTls = false;
Expand Down Expand Up @@ -744,7 +761,7 @@ class Redis extends Commander {
* @param error The error object to send to the commands
* @param options options
*/
private flushQueue(error: Error, options?: RedisOptions) {
private flushQueue(error: Error, options?: FlushQueueOptions) {
options = defaults({}, options, {
offlineQueue: true,
commandQueue: true,
Expand Down
2 changes: 1 addition & 1 deletion lib/ScanStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ interface Options extends ReadableOptions {
}

/**
* Convenient class to convert the process of scaning keys to a readable stream.
* Convenient class to convert the process of scanning keys to a readable stream.
*/
export default class ScanStream extends Readable {
private _redisCursor = "0";
Expand Down
2 changes: 1 addition & 1 deletion lib/Script.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export default class Script {

constructor(
private lua: string,
private numberOfKeys: number = null,
private numberOfKeys: number | null = null,
private keyPrefix: string = "",
private readOnly: boolean = false
) {
Expand Down
2 changes: 1 addition & 1 deletion lib/cluster/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ class Cluster extends Commander {
redis = nodes[0];
}
} else {
let key;
let key: string;
if (to === "all") {
key = sample(nodeKeys);
} else if (to === "slave" && nodeKeys.length > 1) {
Expand Down
4 changes: 2 additions & 2 deletions lib/redis/event_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ function abortError(command: Respondable) {
function abortIncompletePipelines(commandQueue: Deque<CommandItem>) {
let expectedIndex = 0;
for (let i = 0; i < commandQueue.length; ) {
const command = commandQueue.peekAt(i).command as Command;
const command = commandQueue.peekAt(i)?.command as Command;
const pipelineIndex = command.pipelineIndex;
if (pipelineIndex === undefined || pipelineIndex === 0) {
expectedIndex = 0;
Expand All @@ -135,7 +135,7 @@ function abortIncompletePipelines(commandQueue: Deque<CommandItem>) {
// offline queue
function abortTransactionFragments(commandQueue: Deque<CommandItem>) {
for (let i = 0; i < commandQueue.length; ) {
const command = commandQueue.peekAt(i).command as Command;
const command = commandQueue.peekAt(i)?.command as Command;
if (command.name === "multi") {
break;
}
Expand Down
16 changes: 9 additions & 7 deletions lib/utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ export function convertBufferToString(value: any, encoding?: BufferEncoding) {
* expect(output).to.eql([[null, 'a'], [null, 'b'], [new Error('c')], [null, 'd'])
* ```
*/
export function wrapMultiResult(arr: unknown[] | null): unknown[][] {
export function wrapMultiResult(arr: unknown[] | null): unknown[][] | null {
// When using WATCH/EXEC transactions, the EXEC will return
// a null instead of an array
if (!arr) {
return null;
}
const result = [];
const result: unknown[][] = [];
const length = arr.length;
for (let i = 0; i < length; ++i) {
const item = arr[i];
Expand Down Expand Up @@ -133,7 +133,7 @@ export function timeout<T>(
export function convertObjectToArray<T>(
obj: Record<string, T>
): (string | T)[] {
const result = [];
const result: (string | T)[] = [];
const keys = Object.keys(obj); // Object.entries requires node 7+

for (let i = 0, l = keys.length; i < l; i++) {
Expand Down Expand Up @@ -185,7 +185,7 @@ export function optimizeErrorStack(
) {
const stacks = friendlyStack.split("\n");
let lines = "";
let i;
let i: number;
for (i = 1; i < stacks.length; ++i) {
if (stacks[i].indexOf(filterPath) === -1) {
break;
Expand All @@ -194,8 +194,10 @@ export function optimizeErrorStack(
for (let j = i; j < stacks.length; ++j) {
lines += "\n" + stacks[j];
}
const pos = error.stack.indexOf("\n");
error.stack = error.stack.slice(0, pos) + lines;
if (error.stack) {
const pos = error.stack.indexOf("\n");
error.stack = error.stack.slice(0, pos) + lines;
}
return error;
}

Expand Down Expand Up @@ -278,7 +280,7 @@ export function resolveTLSProfile(options: TLSOptions): TLSOptions {
export function sample<T>(array: T[], from = 0): T {
const length = array.length;
if (from >= length) {
return;
return null;
}
return array[from + Math.floor(Math.random() * (length - from))];
}
Expand Down

0 comments on commit 96a9f41

Please sign in to comment.