diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 986cce46b6..0837c54d3f 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -237,6 +237,8 @@ export class Connection extends TypedEventEmitter { .on('error', this.onError.bind(this)); this.socket.on('close', this.onClose.bind(this)); this.socket.on('timeout', this.onTimeout.bind(this)); + + this.messageStream.pause(); } public get hello() { @@ -651,6 +653,7 @@ export class Connection extends TypedEventEmitter { private async *readMany(): AsyncGenerator { try { this.dataEvents = onData(this.messageStream); + this.messageStream.resume(); for await (const message of this.dataEvents) { const response = await decompressResponse(message); yield response; @@ -661,6 +664,7 @@ export class Connection extends TypedEventEmitter { } } finally { this.dataEvents = null; + this.messageStream.pause(); this.throwIfAborted(); } } diff --git a/test/unit/cmap/connection.test.ts b/test/unit/cmap/connection.test.ts index 75d5c246f2..05e66f3dcf 100644 --- a/test/unit/cmap/connection.test.ts +++ b/test/unit/cmap/connection.test.ts @@ -1,11 +1,15 @@ +import { Socket } from 'node:net'; + import { expect } from 'chai'; import * as sinon from 'sinon'; +import { setTimeout } from 'timers/promises'; import { connect, Connection, isHello, MongoClientAuthProviders, + MongoDBCollectionNamespace, MongoNetworkTimeoutError, ns } from '../../mongodb'; @@ -142,4 +146,181 @@ describe('new Connection()', function () { expect(beforeHandshakeSymbol).to.be.a('symbol'); expect(error).to.have.property(beforeHandshakeSymbol, true); }); + + describe('NODE-6370: regression test', function () { + class MockSocket extends Socket { + override write(_data: string | Buffer) { + return false; + } + } + + let socket: MockSocket; + let connection: Connection; + + this.timeout(10_000); + + beforeEach(function () { + socket = new MockSocket(); + connection = new Connection(socket, {}); + }); + + const validResponse = Buffer.from( + 'a30000002a0800004b010000dd07000000000000008e000000016f6b00000000000000f03f0324636c757374657254696d65005800000011636c757374657254696d65001c00000093f6f266037369676e61747572650033000000056861736800140000000072d8d6eab4e0703d2d50846e2db7adb5d2733cc4126b65794964000200000026f6f2660000116f7065726174696f6e54696d65001c00000093f6f26600', + 'hex' + ); + + const chunks = [validResponse.slice(0, 10), validResponse.slice(10)]; + + describe('when data is emitted before drain', function () { + describe('first command', function () { + describe('when there is no delay between data and drain', function () { + it('does not hang', async function () { + const result$ = connection.command( + MongoDBCollectionNamespace.fromString('foo.bar'), + { ping: 1 }, + {} + ); + // there is an await in writeCommand, we must move the event loop forward just enough + // so that we reach the `await drain`. Otherwise, we'll emit both data and drain before + // listeners are attached. + await setTimeout(0); + + socket.emit('data', validResponse); + socket.emit('drain'); + + await result$; + }); + }); + + describe('when there is a delay between data and drain', function () { + it('does not hang', async function () { + const result$ = connection.command( + MongoDBCollectionNamespace.fromString('foo.bar'), + { ping: 1 }, + {} + ); + + // there is an await in writeCommand, we must move the event loop forward just enough + // so that we reach the `await drain`. Otherwise, we'll emit both data and drain before + // listeners are attached. + await setTimeout(0); + socket.emit('data', validResponse); + + await setTimeout(10); + + socket.emit('drain'); + await result$; + }); + }); + + describe('when the data comes in multiple chunks', function () { + it('does not hang', async function () { + const result$ = connection.command( + MongoDBCollectionNamespace.fromString('foo.bar'), + { ping: 1 }, + {} + ); + + // there is an await in writeCommand, we must move the event loop forward just enough + // so that we reach the `await drain`. Otherwise, we'll emit both data and drain before + // listeners are attached. + await setTimeout(0); + socket.emit('data', chunks[0]); + + await setTimeout(10); + socket.emit('drain'); + + socket.emit('data', chunks[1]); + + await result$; + }); + }); + }); + + describe('not first command', function () { + beforeEach(async function () { + const result$ = connection.command( + MongoDBCollectionNamespace.fromString('foo.bar'), + { ping: 1 }, + {} + ); + + // there is an await in writeCommand, we must move the event loop forward just enough + // so that we reach the `await drain`. Otherwise, we'll emit both data and drain before + // listeners are attached. + await setTimeout(0); + socket.emit('drain'); + socket.emit('data', validResponse); + + await result$; + }); + + describe('when there is no delay between data and drain', function () { + it('does not hang', async function () { + const result$ = connection.command( + MongoDBCollectionNamespace.fromString('foo.bar'), + { ping: 1 }, + {} + ); + + // there is an await in writeCommand, we must move the event loop forward just enough + // so that we reach the `await drain`. Otherwise, we'll emit both data and drain before + // listeners are attached. + await setTimeout(0); + socket.emit('data', validResponse); + + // await setTimeout(0); + // await setTimeout(10); + socket.emit('drain'); + await result$; + }); + }); + + describe('when there is a delay between data and drain', function () { + it('does not hang', async function () { + const result$ = connection.command( + MongoDBCollectionNamespace.fromString('foo.bar'), + { ping: 1 }, + {} + ); + + // there is an await in writeCommand, we must move the event loop forward just enough + // so that we reach the `await drain`. Otherwise, we'll emit both data and drain before + // listeners are attached. + await setTimeout(0); + socket.emit('data', validResponse); + + await setTimeout(10); + // await setTimeout(10); + socket.emit('drain'); + await result$; + }); + }); + + describe('when the data comes in multiple chunks', function () { + it('does not hang', async function () { + const result$ = connection.command( + MongoDBCollectionNamespace.fromString('foo.bar'), + { ping: 1 }, + {} + ); + + // there is an await in writeCommand, we must move the event loop forward just enough + // so that we reach the `await drain`. Otherwise, we'll emit both data and drain before + // listeners are attached. + await setTimeout(0); + + socket.emit('data', chunks[0]); + + await setTimeout(10); + + socket.emit('drain'); + + socket.emit('data', chunks[1]); + await result$; + }); + }); + }); + }); + }); });