From 1e31bfe2baaabca04e0c2548588d347f92b69aa6 Mon Sep 17 00:00:00 2001 From: Garr Date: Sat, 21 Sep 2024 14:10:32 -0700 Subject: [PATCH] fix(NODE-6370) response messages to large commands can be lost under load --- src/cmap/connection.ts | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 986cce46b6..9debdcec68 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -434,12 +434,14 @@ export class Connection extends TypedEventEmitter { } try { - await this.writeCommand(message, { + const drain = await this.writeCommand(message, { agreedCompressor: this.description.compressor ?? 'none', zlibCompressionLevel: this.description.zlibCompressionLevel }); if (options.noResponse || message.moreToCome) { + // if writing multiple messages, make sure we drain + if (drain) await once(this.socket, 'drain'); yield MongoDBResponse.empty; return; } @@ -618,13 +620,15 @@ export class Connection extends TypedEventEmitter { /** * @internal * - * Writes an OP_MSG or OP_QUERY request to the socket, optionally compressing the command. This method - * waits until the socket's buffer has emptied (the Nodejs socket `drain` event has fired). + * Writes an OP_MSG or OP_QUERY request to the socket, optionally compressing the command. + * This method does not wait until the socket's buffer has emptied but returns true if the + * caller should. Awaiting the `drain` event can result in losts received messages, because + * the 'data' event is not yet handled. */ private async writeCommand( command: WriteProtocolMessageType, options: { agreedCompressor?: CompressorName; zlibCompressionLevel?: number } - ): Promise { + ): Promise { const finalCommand = options.agreedCompressor === 'none' || !OpCompressedRequest.canCompress(command) ? command @@ -635,8 +639,7 @@ export class Connection extends TypedEventEmitter { const buffer = Buffer.concat(await finalCommand.toBin()); - if (this.socket.write(buffer)) return; - return await once(this.socket, 'drain'); + return !this.socket.write(buffer); } /**