Skip to content

Commit

Permalink
Refactor request abort handling (#99)
Browse files Browse the repository at this point in the history
* refactor: remove use of deprecated response 'aborted' event

* refactor: Stop using deprecated request.abort

* refactor: better error messages for some failed test assertions

* refactor: unit test linter cleanup

* refactor: improve docstrings
  • Loading branch information
JoshMock authored May 23, 2024
1 parent 7622b1d commit 5c3438e
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 89 deletions.
9 changes: 9 additions & 0 deletions src/Serializer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ export default class Serializer {
}
}

/**
* Serializes a record into a JSON string
*/
serialize (object: Record<string, any>): string {
debug('Serializing', object)
let json
Expand All @@ -54,6 +57,9 @@ export default class Serializer {
return json
}

/**
* Given a string, attempts to parse it from raw JSON into an object
*/
deserialize<T = unknown> (json: string): T {
debug('Deserializing', json)
let object
Expand All @@ -66,6 +72,9 @@ export default class Serializer {
return object
}

/**
* Serializes an array of records into an ndjson string
*/
ndserialize (array: Array<Record<string, any> | string>): string {
debug('ndserialize', array)
if (!Array.isArray(array)) {
Expand Down
43 changes: 14 additions & 29 deletions src/connection/HttpConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ export default class HttpConnection extends BaseConnection {
}

const abortListener = (): void => {
request.abort()
request.destroy(new RequestAbortedError('Request aborted'))
}

this._openRequests++
Expand Down Expand Up @@ -169,10 +169,7 @@ export default class HttpConnection extends BaseConnection {
function onDataAsBuffer (chunk: Buffer): void {
currentLength += Buffer.byteLength(chunk)
if (currentLength > maxCompressedResponseSize) {
// TODO: hacky solution, refactor to avoid using the deprecated aborted event
response.removeListener('aborted', onAbort)
response.destroy()
onEnd(new RequestAbortedError(`The content length (${currentLength}) is bigger than the maximum allowed buffer (${maxCompressedResponseSize})`))
response.destroy(new RequestAbortedError(`The content length (${currentLength}) is bigger than the maximum allowed buffer (${maxCompressedResponseSize})`))
} else {
(payload as Buffer[]).push(chunk)
}
Expand All @@ -181,10 +178,7 @@ export default class HttpConnection extends BaseConnection {
function onDataAsString (chunk: string): void {
currentLength += Buffer.byteLength(chunk)
if (currentLength > maxResponseSize) {
// TODO: hacky solution, refactor to avoid using the deprecated aborted event
response.removeListener('aborted', onAbort)
response.destroy()
onEnd(new RequestAbortedError(`The content length (${currentLength}) is bigger than the maximum allowed string (${maxResponseSize})`))
response.destroy(new RequestAbortedError(`The content length (${currentLength}) is bigger than the maximum allowed string (${maxResponseSize})`))
} else {
payload = `${payload as string}${chunk}`
}
Expand All @@ -194,10 +188,14 @@ export default class HttpConnection extends BaseConnection {
response.removeListener('data', onData)
response.removeListener('end', onEnd)
response.removeListener('error', onEnd)
response.removeListener('aborted', onAbort)
request.removeListener('error', noop)

if (err != null) {
// @ts-expect-error
if (err.message === 'aborted' && err.code === 'ECONNRESET') {
response.destroy()
return reject(new ConnectionError('Response aborted while reading the body'))
}
if (err.name === 'RequestAbortedError') {
return reject(err)
}
Expand All @@ -211,11 +209,6 @@ export default class HttpConnection extends BaseConnection {
})
}

const onAbort = (): void => {
response.destroy()
onEnd(new Error('Response aborted while reading the body'))
}

if (!isCompressed && !isVectorTile) {
response.setEncoding('utf8')
}
Expand All @@ -224,36 +217,30 @@ export default class HttpConnection extends BaseConnection {
response.on('data', onData)
response.on('error', onEnd)
response.on('end', onEnd)
response.on('aborted', onAbort)
}

const onTimeout = (): void => {
cleanListeners()
this._openRequests--
request.once('error', () => {}) // we need to catch the request aborted error
request.abort()
request.destroy()
reject(new TimeoutError('Request timed out'))
}

const onError = (err: Error): void => {
cleanListeners()
this._openRequests--
let message = err.message
if (err.name === 'RequestAbortedError') {
return reject(err)
}
// @ts-expect-error
if (err.code === 'ECONNRESET') {
message += ` - Local: ${request.socket?.localAddress ?? 'unknown'}:${request.socket?.localPort ?? 'unknown'}, Remote: ${request.socket?.remoteAddress ?? 'unknown'}:${request.socket?.remotePort ?? 'unknown'}`
}
reject(new ConnectionError(message))
}

const onAbort = (): void => {
cleanListeners()
request.once('error', () => {}) // we need to catch the request aborted error
debug('Request aborted', params)
this._openRequests--
reject(new RequestAbortedError('Request aborted'))
}

const onSocket = (socket: TLSSocket): void => {
/* istanbul ignore else */
if (!socket.isSessionReused()) {
Expand All @@ -263,15 +250,15 @@ export default class HttpConnection extends BaseConnection {
if (issuerCertificate == null) {
onError(new Error('Invalid or malformed certificate'))
request.once('error', () => {}) // we need to catch the request aborted error
return request.abort()
return request.destroy()
}

// Check if fingerprint matches
/* istanbul ignore else */
if (this[kCaFingerprint] !== issuerCertificate.fingerprint256) {
onError(new Error('Server certificate CA fingerprint does not match the value configured in caFingerprint'))
request.once('error', () => {}) // we need to catch the request aborted error
return request.abort()
return request.destroy()
}
})
}
Expand All @@ -280,7 +267,6 @@ export default class HttpConnection extends BaseConnection {
request.on('response', onResponse)
request.on('timeout', onTimeout)
request.on('error', onError)
request.on('abort', onAbort)
if (this[kCaFingerprint] != null && requestParams.protocol === 'https:') {
request.on('socket', onSocket)
}
Expand Down Expand Up @@ -308,7 +294,6 @@ export default class HttpConnection extends BaseConnection {
request.removeListener('response', onResponse)
request.removeListener('timeout', onTimeout)
request.removeListener('error', onError)
request.removeListener('abort', onAbort)
request.removeListener('socket', onSocket)
if (options.signal != null) {
if ('removeEventListener' in options.signal) {
Expand Down
10 changes: 5 additions & 5 deletions src/pool/BaseConnectionPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ export default class BaseConnectionPool {
/**
* Adds a new connection to the pool.
*
* @param {object|string} host
* @returns {ConnectionPool}
* @param connection Connection options, or the URL of a node
* @returns This ConnectionPool instance
*/
addConnection (connection: AddConnectionOptions | AddConnectionOptions[]): this {
if (Array.isArray(connection)) {
Expand All @@ -160,10 +160,10 @@ export default class BaseConnectionPool {
}

/**
* Removes a new connection to the pool.
* Removes a connection from the pool.
*
* @param {object} connection
* @returns {ConnectionPool}
* @param connection The connection to remove
* @returns This ConnectionPool instance
*/
removeConnection (connection: Connection): this {
debug('Removing connection', connection)
Expand Down
Loading

0 comments on commit 5c3438e

Please sign in to comment.