Skip to content

Commit

Permalink
Revert "fix(NODE-4834): ensure that MessageStream is destroyed when c…
Browse files Browse the repository at this point in the history
…onnections are destroyed (#3482)"

This reverts commit 8338bae.
  • Loading branch information
nbbeeken committed Feb 6, 2023
1 parent f845a34 commit 385b22d
Show file tree
Hide file tree
Showing 13 changed files with 127 additions and 278 deletions.
2 changes: 1 addition & 1 deletion src/cmap/connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ function performInitialHandshake(
) {
const callback: Callback<Document> = function (err, ret) {
if (err && conn) {
conn.destroy({ force: false });
conn.destroy();
}
_callback(err, ret);
};
Expand Down
53 changes: 37 additions & 16 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ export interface ConnectionOptions
/** @public */
export interface DestroyOptions {
/** Force the destruction. */
force: boolean;
force?: boolean;
}

/** @public */
Expand All @@ -154,8 +154,8 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
address: string;
socketTimeoutMS: number;
monitorCommands: boolean;
/** Indicates that the connection (including underlying TCP socket) has been closed. */
closed: boolean;
destroyed: boolean;
lastHelloMS?: number;
serverApi?: ServerApi;
helloOk?: boolean;
Expand Down Expand Up @@ -204,6 +204,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
this.monitorCommands = options.monitorCommands;
this.serverApi = options.serverApi;
this.closed = false;
this.destroyed = false;
this[kHello] = null;
this[kClusterTime] = null;

Expand Down Expand Up @@ -296,7 +297,10 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
if (this.closed) {
return;
}
this.destroy({ force: false });

this[kStream].destroy(error);

this.closed = true;

for (const op of this[kQueue].values()) {
op.cb(error);
Expand All @@ -310,7 +314,8 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
if (this.closed) {
return;
}
this.destroy({ force: false });

this.closed = true;

const message = `connection ${this.id} to ${this.address} closed`;
for (const op of this[kQueue].values()) {
Expand All @@ -327,7 +332,9 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
}

this[kDelayedTimeoutId] = setTimeout(() => {
this.destroy({ force: false });
this[kStream].destroy();

this.closed = true;

const message = `connection ${this.id} to ${this.address} timed out`;
const beforeHandshake = this.hello == null;
Expand Down Expand Up @@ -436,27 +443,41 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
callback(undefined, message.documents[0]);
}

destroy(options: DestroyOptions, callback?: Callback): void {
destroy(options?: DestroyOptions, callback?: Callback): void {
if (typeof options === 'function') {
callback = options;
options = { force: false };
}

this.removeAllListeners(Connection.PINNED);
this.removeAllListeners(Connection.UNPINNED);

this[kMessageStream].destroy();
this.closed = true;
options = Object.assign({ force: false }, options);
if (this[kStream] == null || this.destroyed) {
this.destroyed = true;
if (typeof callback === 'function') {
callback();
}

return;
}

if (options.force) {
this[kStream].destroy();
if (callback) {
return process.nextTick(callback);
this.destroyed = true;
if (typeof callback === 'function') {
callback();
}

return;
}

if (!this[kStream].writableEnded) {
this[kStream].end(callback);
} else {
if (callback) {
return process.nextTick(callback);
this[kStream].end(() => {
this.destroyed = true;
if (typeof callback === 'function') {
callback();
}
}
});
}

command(
Expand Down
4 changes: 2 additions & 2 deletions src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
ConnectionPool.CONNECTION_CLOSED,
new ConnectionClosedEvent(this, conn, 'poolClosed')
);
conn.destroy({ force: !!options.force }, cb);
conn.destroy(options, cb);
},
err => {
this[kConnections].clear();
Expand Down Expand Up @@ -586,7 +586,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
new ConnectionClosedEvent(this, connection, reason)
);
// destroy the connection
process.nextTick(() => connection.destroy({ force: false }));
process.nextTick(() => connection.destroy());
}

private connectionIsStale(connection: Connection) {
Expand Down
5 changes: 1 addition & 4 deletions src/sdam/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {

/** Destroy the server connection */
destroy(options?: DestroyOptions, callback?: Callback): void {
if (typeof options === 'function') {
callback = options;
options = { force: false };
}
if (typeof options === 'function') (callback = options), (options = {});
options = Object.assign({}, { force: false }, options);

if (this.s.state === STATE_CLOSED) {
Expand Down
17 changes: 13 additions & 4 deletions src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -466,17 +466,26 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
}

/** Close this topology */
close(callback: Callback): void;
close(options: CloseOptions): void;
close(options: CloseOptions, callback: Callback): void;
close(options?: CloseOptions, callback?: Callback): void {
options = options ?? { force: false };
close(options?: CloseOptions | Callback, callback?: Callback): void {
if (typeof options === 'function') {
callback = options;
options = {};
}

if (typeof options === 'boolean') {
options = { force: options };
}
options = options ?? {};

if (this.s.state === STATE_CLOSED || this.s.state === STATE_CLOSING) {
return callback?.();
}

const destroyedServers = Array.from(this.s.servers.values(), server => {
return promisify(destroyServer)(server, this, { force: !!options?.force });
return promisify(destroyServer)(server, this, options as CloseOptions);
});

Promise.all(destroyedServers)
Expand Down Expand Up @@ -731,7 +740,7 @@ function destroyServer(
options?: DestroyOptions,
callback?: Callback
) {
options = options ?? { force: false };
options = options ?? {};
for (const event of LOCAL_SERVER_EVENTS) {
server.removeAllListeners(event);
}
Expand Down
69 changes: 49 additions & 20 deletions test/integration/crud/misc_cursors.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ const { ReadPreference } = require('../../mongodb');
const { ServerType } = require('../../mongodb');
const { formatSort } = require('../../mongodb');
const { getSymbolFrom } = require('../../tools/utils');
const { MongoExpiredSessionError } = require('../../mongodb');

describe('Cursor', function () {
before(function () {
Expand Down Expand Up @@ -1868,31 +1867,61 @@ describe('Cursor', function () {
}
});

it('closes cursors when client is closed even if it has not been exhausted', async function () {
await client
.db()
.dropCollection('test_cleanup_tailable')
.catch(() => null);
it('should close dead tailable cursors', {
metadata: {
os: '!win32' // NODE-2943: timeout on windows
},

const collection = await client
.db()
.createCollection('test_cleanup_tailable', { capped: true, size: 1000, max: 3 });
test: function (done) {
// http://www.mongodb.org/display/DOCS/Tailable+Cursors

// insert only 2 docs in capped coll of 3
await collection.insertMany([{ a: 1 }, { a: 1 }]);
const configuration = this.configuration;
client.connect((err, client) => {
expect(err).to.not.exist;
this.defer(() => client.close());

const cursor = collection.find({}, { tailable: true, awaitData: true, maxAwaitTimeMS: 2000 });
const db = client.db(configuration.db);
const options = { capped: true, size: 10000000 };
db.createCollection(
'test_if_dead_tailable_cursors_close',
options,
function (err, collection) {
expect(err).to.not.exist;

await cursor.next();
await cursor.next();
// will block for maxAwaitTimeMS (except we are closing the client)
const rejectedEarlyBecauseClientClosed = cursor.next().catch(error => error);
let closeCount = 0;
const docs = Array.from({ length: 100 }).map(() => ({ a: 1 }));
collection.insertMany(docs, { w: 'majority', wtimeoutMS: 5000 }, err => {
expect(err).to.not.exist;

await client.close();
expect(cursor).to.have.property('killed', true);
const cursor = collection.find({}, { tailable: true, awaitData: true });
const stream = cursor.stream();

stream.resume();

var validator = () => {
closeCount++;
if (closeCount === 2) {
done();
}
};

// we validate that the stream "ends" either cleanly or with an error
stream.on('end', validator);
stream.on('error', validator);

cursor.on('close', validator);

const error = await rejectedEarlyBecauseClientClosed;
expect(error).to.be.instanceOf(MongoExpiredSessionError);
const docs = Array.from({ length: 100 }).map(() => ({ a: 1 }));
collection.insertMany(docs, err => {
expect(err).to.not.exist;

setTimeout(() => client.close());
});
});
}
);
});
}
});

it('shouldAwaitData', {
Expand Down
18 changes: 5 additions & 13 deletions test/integration/node-specific/topology.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,12 @@ describe('Topology', function () {
const states = [];
topology.on('stateChanged', (_, newState) => states.push(newState));
topology.connect(err => {
try {
expect(err).to.not.exist;
topology.close(err => {
expect(err).to.not.exist;
} catch (error) {
done(error);
}
topology.close({}, err => {
try {
expect(err).to.not.exist;
expect(topology.isDestroyed()).to.be.true;
expect(states).to.eql(['connecting', 'connected', 'closing', 'closed']);
done();
} catch (error) {
done(error);
}
expect(topology.isDestroyed()).to.be.true;
expect(states).to.eql(['connecting', 'connected', 'closing', 'closed']);
done();
});
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ describe('Polling Srv Records for Mongos Discovery', () => {

afterEach(function (done) {
if (context.topology) {
context.topology.close({}, done);
context.topology.close(done);
} else {
done();
}
Expand Down
2 changes: 1 addition & 1 deletion test/unit/assorted/server_selection_spec_helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ function executeServerSelectionTest(testDefinition, testDone) {
});

function done(err) {
topology.close({}, e => testDone(e || err));
topology.close(e => testDone(e || err));
}

topology.connect(err => {
Expand Down
Loading

0 comments on commit 385b22d

Please sign in to comment.