From 37caf1a5edbc0ff1e4b16e1ca7a9d48373404f28 Mon Sep 17 00:00:00 2001 From: Serge Klochkov <3175289+slvrtrn@users.noreply.github.com> Date: Thu, 22 Jun 2023 14:24:27 +0200 Subject: [PATCH] Timeouts rework (part 2) (#168) --- coverage/badge.svg | 2 +- src/connection/adapter/base_http_adapter.ts | 56 +++++++++------------ src/connection/adapter/http_adapter.ts | 2 +- src/connection/adapter/https_adapter.ts | 2 +- 4 files changed, 28 insertions(+), 34 deletions(-) diff --git a/coverage/badge.svg b/coverage/badge.svg index 855f0bd4..4dbcdb23 100644 --- a/coverage/badge.svg +++ b/coverage/badge.svg @@ -1 +1 @@ -coverage: 92.36%coverage92.36% \ No newline at end of file +coverage: 92.36%coverage92.36% diff --git a/src/connection/adapter/base_http_adapter.ts b/src/connection/adapter/base_http_adapter.ts index cbd51a93..cdd8ae60 100644 --- a/src/connection/adapter/base_http_adapter.ts +++ b/src/connection/adapter/base_http_adapter.ts @@ -22,6 +22,7 @@ import { getAsText, isStream } from '../../utils' import type { ClickHouseSettings } from '../../settings' import { getUserAgent } from '../../utils/user_agent' import * as uuid from 'uuid' +import type * as net from 'net' export interface RequestParams { method: 'GET' | 'POST' @@ -106,29 +107,18 @@ export abstract class BaseHttpAdapter implements Connection { protected abstract createClientRequest( params: RequestParams, - abort_signal: AbortSignal + abort_signal?: AbortSignal ): Http.ClientRequest protected async request(params: RequestParams): Promise { return new Promise((resolve, reject) => { const start = Date.now() - const abortController = new AbortController() - let isTimedOut = false - const timeout = setTimeout(() => { - isTimedOut = true - abortController.abort() - }, this.config.request_timeout).unref() - - const request = this.createClientRequest(params, abortController.signal) + const request = this.createClientRequest(params, params.abort_signal) function onError(err: Error): void { removeRequestListeners() - if (isTimedOut) { - reject(new Error('Request timed out')) - } else { - reject(err) - } + reject(err) } const onResponse = async ( @@ -159,11 +149,7 @@ export abstract class BaseHttpAdapter implements Connection { * see the full sequence of events https://nodejs.org/api/http.html#httprequesturl-options-callback * */ }) - if (isTimedOut) { - reject(new Error('Timeout error')) - } else { - reject(new Error('The request was aborted.')) - } + reject(new Error('The request was aborted.')) } function onClose(): void { @@ -173,35 +159,43 @@ export abstract class BaseHttpAdapter implements Connection { removeRequestListeners() } - function onUserAbortSignal(): void { - abortController.abort() + const config = this.config + function onSocket(socket: net.Socket): void { + // Force KeepAlive usage (workaround due to Node.js bug) + // https://github.com/nodejs/node/issues/47137#issuecomment-1477075229 + socket.setKeepAlive(true, 1000) + socket.setTimeout(config.request_timeout, onTimeout) + } + + function onTimeout(): void { + removeRequestListeners() + request.destroy() + reject(new Error('Timeout error')) } function removeRequestListeners(): void { - clearTimeout(timeout) + if (request.socket !== null) { + request.socket.setTimeout(0) // reset previously set timeout + request.socket.removeListener('timeout', onTimeout) + } + request.removeListener('socket', onSocket) request.removeListener('response', onResponse) request.removeListener('error', onError) request.removeListener('close', onClose) if (params.abort_signal !== undefined) { - params.abort_signal.removeEventListener('abort', onUserAbortSignal) + request.removeListener('abort', onAbort) } - abortController.signal.removeEventListener('abort', onAbort) } + request.on('socket', onSocket) request.on('response', onResponse) request.on('error', onError) request.on('close', onClose) if (params.abort_signal !== undefined) { - params.abort_signal.addEventListener('abort', onUserAbortSignal, { - once: true, - }) + params.abort_signal.addEventListener('abort', onAbort, { once: true }) } - abortController.signal.addEventListener('abort', onAbort, { - once: true, - }) - if (!params.body) return request.end() const bodyStream = isStream(params.body) diff --git a/src/connection/adapter/http_adapter.ts b/src/connection/adapter/http_adapter.ts index e2009855..ffad35f4 100644 --- a/src/connection/adapter/http_adapter.ts +++ b/src/connection/adapter/http_adapter.ts @@ -16,7 +16,7 @@ export class HttpAdapter extends BaseHttpAdapter implements Connection { protected createClientRequest( params: RequestParams, - abort_signal: AbortSignal + abort_signal?: AbortSignal ): Http.ClientRequest { return Http.request(params.url, { method: params.method, diff --git a/src/connection/adapter/https_adapter.ts b/src/connection/adapter/https_adapter.ts index 1d69df1c..e89e676a 100644 --- a/src/connection/adapter/https_adapter.ts +++ b/src/connection/adapter/https_adapter.ts @@ -39,7 +39,7 @@ export class HttpsAdapter extends BaseHttpAdapter implements Connection { protected createClientRequest( params: RequestParams, - abort_signal: AbortSignal + abort_signal?: AbortSignal ): Http.ClientRequest { return Https.request(params.url, { method: params.method,