Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(NODE-6394): pause message stream when not listening for events #4249

Merged
merged 2 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
.on('error', this.onError.bind(this));
this.socket.on('close', this.onClose.bind(this));
this.socket.on('timeout', this.onTimeout.bind(this));

this.messageStream.pause();
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
}

public get hello() {
Expand Down Expand Up @@ -651,6 +653,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
private async *readMany(): AsyncGenerator<OpMsgResponse | OpReply> {
try {
this.dataEvents = onData(this.messageStream);
this.messageStream.resume();
for await (const message of this.dataEvents) {
const response = await decompressResponse(message);
yield response;
Expand All @@ -661,6 +664,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
}
} finally {
this.dataEvents = null;
this.messageStream.pause();
this.throwIfAborted();
}
}
Expand Down
181 changes: 181 additions & 0 deletions test/unit/cmap/connection.test.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import { Socket } from 'node:net';

import { expect } from 'chai';
import * as sinon from 'sinon';
import { setTimeout } from 'timers/promises';

import {
connect,
Connection,
isHello,
MongoClientAuthProviders,
MongoDBCollectionNamespace,
MongoNetworkTimeoutError,
ns
} from '../../mongodb';
Expand Down Expand Up @@ -142,4 +146,181 @@ describe('new Connection()', function () {
expect(beforeHandshakeSymbol).to.be.a('symbol');
expect(error).to.have.property(beforeHandshakeSymbol, true);
});

describe('NODE-6370: regression test', function () {
class MockSocket extends Socket {
override write(_data: string | Buffer) {
return false;
}
}

let socket: MockSocket;
let connection: Connection;

this.timeout(10_000);

beforeEach(function () {
socket = new MockSocket();
connection = new Connection(socket, {});
});

const validResponse = Buffer.from(
'a30000002a0800004b010000dd07000000000000008e000000016f6b00000000000000f03f0324636c757374657254696d65005800000011636c757374657254696d65001c00000093f6f266037369676e61747572650033000000056861736800140000000072d8d6eab4e0703d2d50846e2db7adb5d2733cc4126b65794964000200000026f6f2660000116f7065726174696f6e54696d65001c00000093f6f26600',
'hex'
);

const chunks = [validResponse.slice(0, 10), validResponse.slice(10)];

describe('when data is emitted before drain', function () {
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
describe('first command', function () {
describe('when there is no delay between data and drain', function () {
it('does not hang', async function () {
const result$ = connection.command(
MongoDBCollectionNamespace.fromString('foo.bar'),
{ ping: 1 },
{}
);
// there is an await in writeCommand, we must move the event loop forward just enough
// so that we reach the `await drain`. Otherwise, we'll emit both data and drain before
// listeners are attached.
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
await setTimeout(0);

socket.emit('data', validResponse);
socket.emit('drain');

await result$;
});
});

describe('when there is a delay between data and drain', function () {
it('does not hang', async function () {
const result$ = connection.command(
MongoDBCollectionNamespace.fromString('foo.bar'),
{ ping: 1 },
{}
);

// there is an await in writeCommand, we must move the event loop forward just enough
// so that we reach the `await drain`. Otherwise, we'll emit both data and drain before
// listeners are attached.
await setTimeout(0);
socket.emit('data', validResponse);

await setTimeout(10);

socket.emit('drain');
await result$;
});
});

describe('when the data comes in multiple chunks', function () {
it('does not hang', async function () {
const result$ = connection.command(
MongoDBCollectionNamespace.fromString('foo.bar'),
{ ping: 1 },
{}
);

// there is an await in writeCommand, we must move the event loop forward just enough
// so that we reach the `await drain`. Otherwise, we'll emit both data and drain before
// listeners are attached.
await setTimeout(0);
socket.emit('data', chunks[0]);

await setTimeout(10);
socket.emit('drain');

socket.emit('data', chunks[1]);

await result$;
});
});
});

describe('not first command', function () {
beforeEach(async function () {
const result$ = connection.command(
MongoDBCollectionNamespace.fromString('foo.bar'),
{ ping: 1 },
{}
);

// there is an await in writeCommand, we must move the event loop forward just enough
// so that we reach the `await drain`. Otherwise, we'll emit both data and drain before
// listeners are attached.
await setTimeout(0);
socket.emit('drain');
socket.emit('data', validResponse);

await result$;
});

describe('when there is no delay between data and drain', function () {
it('does not hang', async function () {
const result$ = connection.command(
MongoDBCollectionNamespace.fromString('foo.bar'),
{ ping: 1 },
{}
);

// there is an await in writeCommand, we must move the event loop forward just enough
// so that we reach the `await drain`. Otherwise, we'll emit both data and drain before
// listeners are attached.
await setTimeout(0);
socket.emit('data', validResponse);

// await setTimeout(0);
// await setTimeout(10);
socket.emit('drain');
await result$;
});
});

describe('when there is a delay between data and drain', function () {
it('does not hang', async function () {
const result$ = connection.command(
MongoDBCollectionNamespace.fromString('foo.bar'),
{ ping: 1 },
{}
);

// there is an await in writeCommand, we must move the event loop forward just enough
// so that we reach the `await drain`. Otherwise, we'll emit both data and drain before
// listeners are attached.
await setTimeout(0);
socket.emit('data', validResponse);

await setTimeout(10);
// await setTimeout(10);
socket.emit('drain');
await result$;
});
});

describe('when the data comes in multiple chunks', function () {
it('does not hang', async function () {
const result$ = connection.command(
MongoDBCollectionNamespace.fromString('foo.bar'),
{ ping: 1 },
{}
);

// there is an await in writeCommand, we must move the event loop forward just enough
// so that we reach the `await drain`. Otherwise, we'll emit both data and drain before
// listeners are attached.
await setTimeout(0);

socket.emit('data', chunks[0]);

await setTimeout(10);

socket.emit('drain');

socket.emit('data', chunks[1]);
await result$;
});
});
});
});
});
});