Skip to content

Commit

Permalink
Timeouts rework (part 2) (#168)
Browse files Browse the repository at this point in the history
  • Loading branch information
slvrtrn authored Jun 22, 2023
1 parent d8cc6ce commit 37caf1a
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 34 deletions.
2 changes: 1 addition & 1 deletion coverage/badge.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
56 changes: 25 additions & 31 deletions src/connection/adapter/base_http_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import { getAsText, isStream } from '../../utils'
import type { ClickHouseSettings } from '../../settings'
import { getUserAgent } from '../../utils/user_agent'
import * as uuid from 'uuid'
import type * as net from 'net'

export interface RequestParams {
method: 'GET' | 'POST'
Expand Down Expand Up @@ -106,29 +107,18 @@ export abstract class BaseHttpAdapter implements Connection {

protected abstract createClientRequest(
params: RequestParams,
abort_signal: AbortSignal
abort_signal?: AbortSignal
): Http.ClientRequest

protected async request(params: RequestParams): Promise<Stream.Readable> {
return new Promise((resolve, reject) => {
const start = Date.now()

const abortController = new AbortController()
let isTimedOut = false
const timeout = setTimeout(() => {
isTimedOut = true
abortController.abort()
}, this.config.request_timeout).unref()

const request = this.createClientRequest(params, abortController.signal)
const request = this.createClientRequest(params, params.abort_signal)

function onError(err: Error): void {
removeRequestListeners()
if (isTimedOut) {
reject(new Error('Request timed out'))
} else {
reject(err)
}
reject(err)
}

const onResponse = async (
Expand Down Expand Up @@ -159,11 +149,7 @@ export abstract class BaseHttpAdapter implements Connection {
* see the full sequence of events https://nodejs.org/api/http.html#httprequesturl-options-callback
* */
})
if (isTimedOut) {
reject(new Error('Timeout error'))
} else {
reject(new Error('The request was aborted.'))
}
reject(new Error('The request was aborted.'))
}

function onClose(): void {
Expand All @@ -173,35 +159,43 @@ export abstract class BaseHttpAdapter implements Connection {
removeRequestListeners()
}

function onUserAbortSignal(): void {
abortController.abort()
const config = this.config
function onSocket(socket: net.Socket): void {
// Force KeepAlive usage (workaround due to Node.js bug)
// https://github.com/nodejs/node/issues/47137#issuecomment-1477075229
socket.setKeepAlive(true, 1000)
socket.setTimeout(config.request_timeout, onTimeout)
}

function onTimeout(): void {
removeRequestListeners()
request.destroy()
reject(new Error('Timeout error'))
}

function removeRequestListeners(): void {
clearTimeout(timeout)
if (request.socket !== null) {
request.socket.setTimeout(0) // reset previously set timeout
request.socket.removeListener('timeout', onTimeout)
}
request.removeListener('socket', onSocket)
request.removeListener('response', onResponse)
request.removeListener('error', onError)
request.removeListener('close', onClose)
if (params.abort_signal !== undefined) {
params.abort_signal.removeEventListener('abort', onUserAbortSignal)
request.removeListener('abort', onAbort)
}
abortController.signal.removeEventListener('abort', onAbort)
}

request.on('socket', onSocket)
request.on('response', onResponse)
request.on('error', onError)
request.on('close', onClose)

if (params.abort_signal !== undefined) {
params.abort_signal.addEventListener('abort', onUserAbortSignal, {
once: true,
})
params.abort_signal.addEventListener('abort', onAbort, { once: true })
}

abortController.signal.addEventListener('abort', onAbort, {
once: true,
})

if (!params.body) return request.end()

const bodyStream = isStream(params.body)
Expand Down
2 changes: 1 addition & 1 deletion src/connection/adapter/http_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export class HttpAdapter extends BaseHttpAdapter implements Connection {

protected createClientRequest(
params: RequestParams,
abort_signal: AbortSignal
abort_signal?: AbortSignal
): Http.ClientRequest {
return Http.request(params.url, {
method: params.method,
Expand Down
2 changes: 1 addition & 1 deletion src/connection/adapter/https_adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export class HttpsAdapter extends BaseHttpAdapter implements Connection {

protected createClientRequest(
params: RequestParams,
abort_signal: AbortSignal
abort_signal?: AbortSignal
): Http.ClientRequest {
return Https.request(params.url, {
method: params.method,
Expand Down

0 comments on commit 37caf1a

Please sign in to comment.