Skip to content

Commit

Permalink
feat(node-http-handler): defer socket event listener attachment (smit…
Browse files Browse the repository at this point in the history
  • Loading branch information
kuhe authored Sep 5, 2024
1 parent 9f3f2f5 commit c86a02c
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 61 deletions.
5 changes: 5 additions & 0 deletions .changeset/empty-feet-shop.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@smithy/node-http-handler": minor
---

defer socket event listeners for node:http
58 changes: 32 additions & 26 deletions packages/node-http-handler/src/node-http-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,19 +146,20 @@ or increase socketAcquisitionWarningTimeout=(millis) in the NodeHttpHandler conf
this.config = await this.configProvider;
}

let socketCheckTimeoutId: NodeJS.Timeout;

return new Promise((_resolve, _reject) => {
let writeRequestBodyPromise: Promise<void> | undefined = undefined;

// Timeouts related to this request to clear upon completion.
const timeouts = [] as (number | NodeJS.Timeout)[];

const resolve = async (arg: { response: HttpResponse }) => {
await writeRequestBodyPromise;
// if requests are still resolving, cancel the socket usage check.
clearTimeout(socketCheckTimeoutId);
timeouts.forEach(clearTimeout);
_resolve(arg);
};
const reject = async (arg: unknown) => {
await writeRequestBodyPromise;
clearTimeout(socketCheckTimeoutId);
timeouts.forEach(clearTimeout);
_reject(arg);
};

Expand All @@ -180,16 +181,18 @@ or increase socketAcquisitionWarningTimeout=(millis) in the NodeHttpHandler conf

// If the request is taking a long time, check socket usage and potentially warn.
// This warning will be cancelled if the request resolves.
socketCheckTimeoutId = setTimeout(
() => {
this.socketWarningTimestamp = NodeHttpHandler.checkSocketUsage(
agent,
this.socketWarningTimestamp,
this.config!.logger
);
},
this.config.socketAcquisitionWarningTimeout ??
(this.config.requestTimeout ?? 2000) + (this.config.connectionTimeout ?? 1000)
timeouts.push(
setTimeout(
() => {
this.socketWarningTimestamp = NodeHttpHandler.checkSocketUsage(
agent,
this.socketWarningTimestamp,
this.config!.logger
);
},
this.config.socketAcquisitionWarningTimeout ??
(this.config.requestTimeout ?? 2000) + (this.config.connectionTimeout ?? 1000)
)
);

const queryString = buildQueryString(request.query || {});
Expand Down Expand Up @@ -237,10 +240,6 @@ or increase socketAcquisitionWarningTimeout=(millis) in the NodeHttpHandler conf
}
});

// wire-up any timeout logic
setConnectionTimeout(req, reject, this.config.connectionTimeout);
setSocketTimeout(req, reject, this.config.requestTimeout);

// wire-up abort logic
if (abortSignal) {
const onAbort = () => {
Expand All @@ -261,19 +260,26 @@ or increase socketAcquisitionWarningTimeout=(millis) in the NodeHttpHandler conf
}
}

// Defer registration of socket event listeners if the connection and request timeouts
// are longer than a few seconds. This avoids slowing down faster operations.
timeouts.push(setConnectionTimeout(req, reject, this.config.connectionTimeout));
timeouts.push(setSocketTimeout(req, reject, this.config.requestTimeout));

// Workaround for bug report in Node.js https://github.com/nodejs/node/issues/47137
const httpAgent = nodeHttpsOptions.agent;
if (typeof httpAgent === "object" && "keepAlive" in httpAgent) {
setSocketKeepAlive(req, {
// @ts-expect-error keepAlive is not public on httpAgent.
keepAlive: (httpAgent as hAgent).keepAlive,
// @ts-expect-error keepAliveMsecs is not public on httpAgent.
keepAliveMsecs: (httpAgent as hAgent).keepAliveMsecs,
});
timeouts.push(
setSocketKeepAlive(req, {
// @ts-expect-error keepAlive is not public on httpAgent.
keepAlive: (httpAgent as hAgent).keepAlive,
// @ts-expect-error keepAliveMsecs is not public on httpAgent.
keepAliveMsecs: (httpAgent as hAgent).keepAliveMsecs,
})
);
}

writeRequestBodyPromise = writeRequestBody(req, request, this.config.requestTimeout).catch((e) => {
clearTimeout(socketCheckTimeoutId);
timeouts.forEach(clearTimeout);
return _reject(e);
});
});
Expand Down
53 changes: 34 additions & 19 deletions packages/node-http-handler/src/set-connection-timeout.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,43 @@
import { ClientRequest } from "http";
import { Socket } from "net";

export const setConnectionTimeout = (request: ClientRequest, reject: (err: Error) => void, timeoutInMs = 0) => {
const DEFER_EVENT_LISTENER_TIME = 1000;

export const setConnectionTimeout = (
request: ClientRequest,
reject: (err: Error) => void,
timeoutInMs = 0
): NodeJS.Timeout | number => {
if (!timeoutInMs) {
return;
return -1;
}

// Throw a connecting timeout error unless a connection is made within time.
const timeoutId = setTimeout(() => {
request.destroy();
reject(
Object.assign(new Error(`Socket timed out without establishing a connection within ${timeoutInMs} ms`), {
name: "TimeoutError",
})
);
}, timeoutInMs);
const registerTimeout = (offset: number) => {
// Throw a connecting timeout error unless a connection is made within time.
const timeoutId = setTimeout(() => {
request.destroy();
reject(
Object.assign(new Error(`Socket timed out without establishing a connection within ${timeoutInMs} ms`), {
name: "TimeoutError",
})
);
}, timeoutInMs - offset);

request.on("socket", (socket: Socket) => {
if (socket.connecting) {
socket.on("connect", () => {
request.on("socket", (socket: Socket) => {
if (socket.connecting) {
socket.on("connect", () => {
clearTimeout(timeoutId);
});
} else {
clearTimeout(timeoutId);
});
} else {
clearTimeout(timeoutId);
}
});
}
});
};

if (timeoutInMs < 2000) {
registerTimeout(0);
return 0;
}

return setTimeout(registerTimeout.bind(null, DEFER_EVENT_LISTENER_TIME), DEFER_EVENT_LISTENER_TIME);
};
6 changes: 3 additions & 3 deletions packages/node-http-handler/src/set-socket-keep-alive.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ describe("setSocketKeepAlive", () => {
});

it("should set keepAlive to true", () => {
setSocketKeepAlive(request, { keepAlive: true });
setSocketKeepAlive(request, { keepAlive: true }, 0);

const setKeepAliveSpy = jest.spyOn(socket, "setKeepAlive");
request.emit("socket", socket);
Expand All @@ -25,7 +25,7 @@ describe("setSocketKeepAlive", () => {

it("should set keepAlive to true with custom initialDelay", () => {
const initialDelay = 5 * 1000;
setSocketKeepAlive(request, { keepAlive: true, keepAliveMsecs: initialDelay });
setSocketKeepAlive(request, { keepAlive: true, keepAliveMsecs: initialDelay }, 0);

const setKeepAliveSpy = jest.spyOn(socket, "setKeepAlive");
request.emit("socket", socket);
Expand All @@ -35,7 +35,7 @@ describe("setSocketKeepAlive", () => {
});

it("should not set keepAlive at all when keepAlive is false", () => {
setSocketKeepAlive(request, { keepAlive: false });
setSocketKeepAlive(request, { keepAlive: false }, 0);

const setKeepAliveSpy = jest.spyOn(socket, "setKeepAlive");
request.emit("socket", socket);
Expand Down
25 changes: 20 additions & 5 deletions packages/node-http-handler/src/set-socket-keep-alive.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,31 @@
import { ClientRequest } from "http";

const DEFER_EVENT_LISTENER_TIME = 3000;

export interface SocketKeepAliveOptions {
keepAlive: boolean;
keepAliveMsecs?: number;
}

export const setSocketKeepAlive = (request: ClientRequest, { keepAlive, keepAliveMsecs }: SocketKeepAliveOptions) => {
export const setSocketKeepAlive = (
request: ClientRequest,
{ keepAlive, keepAliveMsecs }: SocketKeepAliveOptions,
deferTimeMs = DEFER_EVENT_LISTENER_TIME
): NodeJS.Timeout | number => {
if (keepAlive !== true) {
return;
return -1;
}

const registerListener = () => {
request.on("socket", (socket) => {
socket.setKeepAlive(keepAlive, keepAliveMsecs || 0);
});
};

if (deferTimeMs === 0) {
registerListener();
return 0;
}

request.on("socket", (socket) => {
socket.setKeepAlive(keepAlive, keepAliveMsecs || 0);
});
return setTimeout(registerListener, deferTimeMs);
};
12 changes: 10 additions & 2 deletions packages/node-http-handler/src/set-socket-timeout.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ describe("setSocketTimeout", () => {

beforeEach(() => {
jest.clearAllMocks();
jest.useFakeTimers();
});

afterAll(() => {
jest.clearAllMocks();
jest.useRealTimers();
});

it(`sets the request's timeout if provided`, () => {
Expand All @@ -17,15 +23,17 @@ describe("setSocketTimeout", () => {
expect(clientRequest.setTimeout).toHaveBeenLastCalledWith(100, expect.any(Function));
});

it(`sets the request's timeout to 0 if not provided`, () => {
it(`sets the request's timeout to 0 if not provided`, async () => {
setSocketTimeout(clientRequest, jest.fn());

jest.runAllTimers();

expect(clientRequest.setTimeout).toHaveBeenCalledTimes(1);
expect(clientRequest.setTimeout).toHaveBeenLastCalledWith(0, expect.any(Function));
});

it(`destroys the request on timeout`, () => {
setSocketTimeout(clientRequest, jest.fn());
setSocketTimeout(clientRequest, jest.fn(), 1);
expect(clientRequest.destroy).not.toHaveBeenCalled();

// call setTimeout callback
Expand Down
30 changes: 24 additions & 6 deletions packages/node-http-handler/src/set-socket-timeout.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,27 @@
import { ClientRequest } from "http";

export const setSocketTimeout = (request: ClientRequest, reject: (err: Error) => void, timeoutInMs = 0) => {
request.setTimeout(timeoutInMs, () => {
// destroy the request
request.destroy();
reject(Object.assign(new Error(`Connection timed out after ${timeoutInMs} ms`), { name: "TimeoutError" }));
});
const DEFER_EVENT_LISTENER_TIME = 3000;

export const setSocketTimeout = (
request: ClientRequest,
reject: (err: Error) => void,
timeoutInMs = 0
): NodeJS.Timeout | number => {
const registerTimeout = (offset: number) => {
request.setTimeout(timeoutInMs - offset, () => {
// destroy the request
request.destroy();
reject(Object.assign(new Error(`Connection timed out after ${timeoutInMs} ms`), { name: "TimeoutError" }));
});
};

if (0 < timeoutInMs && timeoutInMs < 6000) {
registerTimeout(0);
return 0;
}

return setTimeout(
registerTimeout.bind(null, timeoutInMs === 0 ? 0 : DEFER_EVENT_LISTENER_TIME),
DEFER_EVENT_LISTENER_TIME
);
};

0 comments on commit c86a02c

Please sign in to comment.