diff --git a/src/cmap/connect.ts b/src/cmap/connect.ts index a69289befa..e3f44cc910 100644 --- a/src/cmap/connect.ts +++ b/src/cmap/connect.ts @@ -96,7 +96,7 @@ function performInitialHandshake( ) { const callback: Callback = function (err, ret) { if (err && conn) { - conn.destroy({ force: false }); + conn.destroy(); } _callback(err, ret); }; diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index f2d7fec5fe..f642192827 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -133,7 +133,7 @@ export interface ConnectionOptions /** @public */ export interface DestroyOptions { /** Force the destruction. */ - force: boolean; + force?: boolean; } /** @public */ @@ -154,8 +154,8 @@ export class Connection extends TypedEventEmitter { address: string; socketTimeoutMS: number; monitorCommands: boolean; - /** Indicates that the connection (including underlying TCP socket) has been closed. */ closed: boolean; + destroyed: boolean; lastHelloMS?: number; serverApi?: ServerApi; helloOk?: boolean; @@ -204,6 +204,7 @@ export class Connection extends TypedEventEmitter { this.monitorCommands = options.monitorCommands; this.serverApi = options.serverApi; this.closed = false; + this.destroyed = false; this[kHello] = null; this[kClusterTime] = null; @@ -296,7 +297,10 @@ export class Connection extends TypedEventEmitter { if (this.closed) { return; } - this.destroy({ force: false }); + + this[kStream].destroy(error); + + this.closed = true; for (const op of this[kQueue].values()) { op.cb(error); @@ -310,7 +314,8 @@ export class Connection extends TypedEventEmitter { if (this.closed) { return; } - this.destroy({ force: false }); + + this.closed = true; const message = `connection ${this.id} to ${this.address} closed`; for (const op of this[kQueue].values()) { @@ -327,7 +332,9 @@ export class Connection extends TypedEventEmitter { } this[kDelayedTimeoutId] = setTimeout(() => { - this.destroy({ force: false }); + this[kStream].destroy(); + + this.closed = true; const message = `connection ${this.id} to ${this.address} timed out`; const beforeHandshake = this.hello == null; @@ -436,27 +443,41 @@ export class Connection extends TypedEventEmitter { callback(undefined, message.documents[0]); } - destroy(options: DestroyOptions, callback?: Callback): void { + destroy(options?: DestroyOptions, callback?: Callback): void { + if (typeof options === 'function') { + callback = options; + options = { force: false }; + } + this.removeAllListeners(Connection.PINNED); this.removeAllListeners(Connection.UNPINNED); - this[kMessageStream].destroy(); - this.closed = true; + options = Object.assign({ force: false }, options); + if (this[kStream] == null || this.destroyed) { + this.destroyed = true; + if (typeof callback === 'function') { + callback(); + } + + return; + } if (options.force) { this[kStream].destroy(); - if (callback) { - return process.nextTick(callback); + this.destroyed = true; + if (typeof callback === 'function') { + callback(); } + + return; } - if (!this[kStream].writableEnded) { - this[kStream].end(callback); - } else { - if (callback) { - return process.nextTick(callback); + this[kStream].end(() => { + this.destroyed = true; + if (typeof callback === 'function') { + callback(); } - } + }); } command( diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index ca3270db5c..e52fc9081d 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -510,7 +510,7 @@ export class ConnectionPool extends TypedEventEmitter { ConnectionPool.CONNECTION_CLOSED, new ConnectionClosedEvent(this, conn, 'poolClosed') ); - conn.destroy({ force: !!options.force }, cb); + conn.destroy(options, cb); }, err => { this[kConnections].clear(); @@ -586,7 +586,7 @@ export class ConnectionPool extends TypedEventEmitter { new ConnectionClosedEvent(this, connection, reason) ); // destroy the connection - process.nextTick(() => connection.destroy({ force: false })); + process.nextTick(() => connection.destroy()); } private connectionIsStale(connection: Connection) { diff --git a/src/sdam/server.ts b/src/sdam/server.ts index 4ed6c1cb9c..e9679de9a2 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -241,10 +241,7 @@ export class Server extends TypedEventEmitter { /** Destroy the server connection */ destroy(options?: DestroyOptions, callback?: Callback): void { - if (typeof options === 'function') { - callback = options; - options = { force: false }; - } + if (typeof options === 'function') (callback = options), (options = {}); options = Object.assign({}, { force: false }, options); if (this.s.state === STATE_CLOSED) { diff --git a/src/sdam/topology.ts b/src/sdam/topology.ts index 3b447a6f00..a862326913 100644 --- a/src/sdam/topology.ts +++ b/src/sdam/topology.ts @@ -466,17 +466,26 @@ export class Topology extends TypedEventEmitter { } /** Close this topology */ + close(callback: Callback): void; close(options: CloseOptions): void; close(options: CloseOptions, callback: Callback): void; - close(options?: CloseOptions, callback?: Callback): void { - options = options ?? { force: false }; + close(options?: CloseOptions | Callback, callback?: Callback): void { + if (typeof options === 'function') { + callback = options; + options = {}; + } + + if (typeof options === 'boolean') { + options = { force: options }; + } + options = options ?? {}; if (this.s.state === STATE_CLOSED || this.s.state === STATE_CLOSING) { return callback?.(); } const destroyedServers = Array.from(this.s.servers.values(), server => { - return promisify(destroyServer)(server, this, { force: !!options?.force }); + return promisify(destroyServer)(server, this, options as CloseOptions); }); Promise.all(destroyedServers) @@ -731,7 +740,7 @@ function destroyServer( options?: DestroyOptions, callback?: Callback ) { - options = options ?? { force: false }; + options = options ?? {}; for (const event of LOCAL_SERVER_EVENTS) { server.removeAllListeners(event); } diff --git a/test/integration/crud/misc_cursors.test.js b/test/integration/crud/misc_cursors.test.js index e0cc7a77d5..3e16a7eeef 100644 --- a/test/integration/crud/misc_cursors.test.js +++ b/test/integration/crud/misc_cursors.test.js @@ -13,7 +13,6 @@ const { ReadPreference } = require('../../mongodb'); const { ServerType } = require('../../mongodb'); const { formatSort } = require('../../mongodb'); const { getSymbolFrom } = require('../../tools/utils'); -const { MongoExpiredSessionError } = require('../../mongodb'); describe('Cursor', function () { before(function () { @@ -1868,31 +1867,61 @@ describe('Cursor', function () { } }); - it('closes cursors when client is closed even if it has not been exhausted', async function () { - await client - .db() - .dropCollection('test_cleanup_tailable') - .catch(() => null); + it('should close dead tailable cursors', { + metadata: { + os: '!win32' // NODE-2943: timeout on windows + }, - const collection = await client - .db() - .createCollection('test_cleanup_tailable', { capped: true, size: 1000, max: 3 }); + test: function (done) { + // http://www.mongodb.org/display/DOCS/Tailable+Cursors - // insert only 2 docs in capped coll of 3 - await collection.insertMany([{ a: 1 }, { a: 1 }]); + const configuration = this.configuration; + client.connect((err, client) => { + expect(err).to.not.exist; + this.defer(() => client.close()); - const cursor = collection.find({}, { tailable: true, awaitData: true, maxAwaitTimeMS: 2000 }); + const db = client.db(configuration.db); + const options = { capped: true, size: 10000000 }; + db.createCollection( + 'test_if_dead_tailable_cursors_close', + options, + function (err, collection) { + expect(err).to.not.exist; - await cursor.next(); - await cursor.next(); - // will block for maxAwaitTimeMS (except we are closing the client) - const rejectedEarlyBecauseClientClosed = cursor.next().catch(error => error); + let closeCount = 0; + const docs = Array.from({ length: 100 }).map(() => ({ a: 1 })); + collection.insertMany(docs, { w: 'majority', wtimeoutMS: 5000 }, err => { + expect(err).to.not.exist; - await client.close(); - expect(cursor).to.have.property('killed', true); + const cursor = collection.find({}, { tailable: true, awaitData: true }); + const stream = cursor.stream(); + + stream.resume(); + + var validator = () => { + closeCount++; + if (closeCount === 2) { + done(); + } + }; + + // we validate that the stream "ends" either cleanly or with an error + stream.on('end', validator); + stream.on('error', validator); + + cursor.on('close', validator); - const error = await rejectedEarlyBecauseClientClosed; - expect(error).to.be.instanceOf(MongoExpiredSessionError); + const docs = Array.from({ length: 100 }).map(() => ({ a: 1 })); + collection.insertMany(docs, err => { + expect(err).to.not.exist; + + setTimeout(() => client.close()); + }); + }); + } + ); + }); + } }); it('shouldAwaitData', { diff --git a/test/integration/node-specific/topology.test.js b/test/integration/node-specific/topology.test.js index 912c1443c4..ee806b9691 100644 --- a/test/integration/node-specific/topology.test.js +++ b/test/integration/node-specific/topology.test.js @@ -10,20 +10,12 @@ describe('Topology', function () { const states = []; topology.on('stateChanged', (_, newState) => states.push(newState)); topology.connect(err => { - try { + expect(err).to.not.exist; + topology.close(err => { expect(err).to.not.exist; - } catch (error) { - done(error); - } - topology.close({}, err => { - try { - expect(err).to.not.exist; - expect(topology.isDestroyed()).to.be.true; - expect(states).to.eql(['connecting', 'connected', 'closing', 'closed']); - done(); - } catch (error) { - done(error); - } + expect(topology.isDestroyed()).to.be.true; + expect(states).to.eql(['connecting', 'connected', 'closing', 'closed']); + done(); }); }); } diff --git a/test/unit/assorted/polling_srv_records_for_mongos_discovery.prose.test.ts b/test/unit/assorted/polling_srv_records_for_mongos_discovery.prose.test.ts index 8d8b2b8412..27e680fc75 100644 --- a/test/unit/assorted/polling_srv_records_for_mongos_discovery.prose.test.ts +++ b/test/unit/assorted/polling_srv_records_for_mongos_discovery.prose.test.ts @@ -103,7 +103,7 @@ describe('Polling Srv Records for Mongos Discovery', () => { afterEach(function (done) { if (context.topology) { - context.topology.close({}, done); + context.topology.close(done); } else { done(); } diff --git a/test/unit/assorted/server_selection_spec_helper.js b/test/unit/assorted/server_selection_spec_helper.js index 32b2147eb1..7c895eba5d 100644 --- a/test/unit/assorted/server_selection_spec_helper.js +++ b/test/unit/assorted/server_selection_spec_helper.js @@ -109,7 +109,7 @@ function executeServerSelectionTest(testDefinition, testDone) { }); function done(err) { - topology.close({}, e => testDone(e || err)); + topology.close(e => testDone(e || err)); } topology.connect(err => { diff --git a/test/unit/cmap/connection.test.ts b/test/unit/cmap/connection.test.ts index c510f1521d..c1801cf9e6 100644 --- a/test/unit/cmap/connection.test.ts +++ b/test/unit/cmap/connection.test.ts @@ -31,7 +31,6 @@ const connectionOptionsDefaults = { /** The absolute minimum socket API needed by Connection as of writing this test */ class FakeSocket extends EventEmitter { - writableEnded: boolean; address() { // is never called } @@ -40,14 +39,6 @@ class FakeSocket extends EventEmitter { } destroy() { // is called, has no side effects - this.writableEnded = true; - } - end(cb) { - this.writableEnded = true; - // nextTick to simulate I/O delay - if (typeof cb === 'function') { - process.nextTick(cb); - } } get remoteAddress() { return 'iLoveJavaScript'; @@ -57,20 +48,6 @@ class FakeSocket extends EventEmitter { } } -class InputStream extends Readable { - writableEnded: boolean; - constructor(options?) { - super(options); - } - - end(cb) { - this.writableEnded = true; - if (typeof cb === 'function') { - process.nextTick(cb); - } - } -} - describe('new Connection()', function () { let server; after(() => mock.cleanup()); @@ -129,7 +106,7 @@ describe('new Connection()', function () { expect(err).to.be.instanceOf(MongoNetworkTimeoutError); expect(result).to.not.exist; - expect(conn).property('stream').property('writableEnded', true); + expect(conn).property('stream').property('destroyed', true); done(); }); @@ -198,7 +175,7 @@ describe('new Connection()', function () { context('when multiple hellos exist on the stream', function () { let callbackSpy; - const inputStream = new InputStream(); + const inputStream = new Readable(); const document = { ok: 1 }; const last = { isWritablePrimary: true }; @@ -417,7 +394,7 @@ describe('new Connection()', function () { connection = sinon.spy(new Connection(driverSocket, connectionOptionsDefaults)); const messageStreamSymbol = getSymbolFrom(connection, 'messageStream'); kDelayedTimeoutId = getSymbolFrom(connection, 'delayedTimeoutId'); - messageStream = sinon.spy(connection[messageStreamSymbol]); + messageStream = connection[messageStreamSymbol]; }); afterEach(() => { @@ -430,15 +407,13 @@ describe('new Connection()', function () { driverSocket.emit('timeout'); expect(connection.onTimeout).to.have.been.calledOnce; - expect(connection.destroy).to.not.have.been.called; expect(connection).to.have.property(kDelayedTimeoutId).that.is.instanceOf(NodeJSTimeoutClass); expect(connection).to.have.property('closed', false); - expect(driverSocket.end).to.not.have.been.called; + expect(driverSocket.destroy).to.not.have.been.called; clock.tick(1); - expect(driverSocket.end).to.have.been.calledOnce; - expect(connection.destroy).to.have.been.calledOnce; + expect(driverSocket.destroy).to.have.been.calledOnce; expect(connection).to.have.property('closed', true); }); @@ -463,88 +438,6 @@ describe('new Connection()', function () { expect(connection).to.have.property('closed', false); expect(connection).to.have.property(kDelayedTimeoutId, null); }); - - it('destroys the message stream and socket', () => { - expect(connection).to.have.property(kDelayedTimeoutId, null); - - driverSocket.emit('timeout'); - - clock.tick(1); - - expect(connection.onTimeout).to.have.been.calledOnce; - expect(connection).to.have.property(kDelayedTimeoutId).that.is.instanceOf(NodeJSTimeoutClass); - - expect(messageStream.destroy).to.have.been.calledOnce; - expect(driverSocket.destroy).to.not.have.been.called; - expect(driverSocket.end).to.have.been.calledOnce; - }); - }); - - describe('onError()', () => { - let connection: sinon.SinonSpiedInstance; - let clock: sinon.SinonFakeTimers; - let timerSandbox: sinon.SinonFakeTimers; - let driverSocket: sinon.SinonSpiedInstance; - let messageStream: MessageStream; - beforeEach(() => { - timerSandbox = createTimerSandbox(); - clock = sinon.useFakeTimers(); - driverSocket = sinon.spy(new FakeSocket()); - // @ts-expect-error: driverSocket does not fully satisfy the stream type, but that's okay - connection = sinon.spy(new Connection(driverSocket, connectionOptionsDefaults)); - const messageStreamSymbol = getSymbolFrom(connection, 'messageStream'); - messageStream = sinon.spy(connection[messageStreamSymbol]); - }); - - afterEach(() => { - timerSandbox.restore(); - clock.restore(); - }); - - it('destroys the message stream and socket', () => { - messageStream.emit('error'); - clock.tick(1); - expect(connection.onError).to.have.been.calledOnce; - connection.destroy({ force: false }); - clock.tick(1); - expect(messageStream.destroy).to.have.been.called; - expect(driverSocket.destroy).to.not.have.been.called; - expect(driverSocket.end).to.have.been.calledOnce; - }); - }); - - describe('onClose()', () => { - let connection: sinon.SinonSpiedInstance; - let clock: sinon.SinonFakeTimers; - let timerSandbox: sinon.SinonFakeTimers; - let driverSocket: sinon.SinonSpiedInstance; - let messageStream: MessageStream; - beforeEach(() => { - timerSandbox = createTimerSandbox(); - clock = sinon.useFakeTimers(); - - driverSocket = sinon.spy(new FakeSocket()); - // @ts-expect-error: driverSocket does not fully satisfy the stream type, but that's okay - connection = sinon.spy(new Connection(driverSocket, connectionOptionsDefaults)); - const messageStreamSymbol = getSymbolFrom(connection, 'messageStream'); - messageStream = sinon.spy(connection[messageStreamSymbol]); - }); - - afterEach(() => { - timerSandbox.restore(); - clock.restore(); - }); - - it('destroys the message stream and socket', () => { - driverSocket.emit('close'); - clock.tick(1); - expect(connection.onClose).to.have.been.calledOnce; - connection.destroy({ force: false }); - clock.tick(1); - expect(messageStream.destroy).to.have.been.called; - expect(driverSocket.destroy).to.not.have.been.called; - expect(driverSocket.end).to.have.been.calledOnce; - }); }); describe('.hasSessionSupport', function () { @@ -598,96 +491,4 @@ describe('new Connection()', function () { }); }); }); - - describe('destroy()', () => { - let connection: sinon.SinonSpiedInstance; - let clock: sinon.SinonFakeTimers; - let timerSandbox: sinon.SinonFakeTimers; - let driverSocket: sinon.SinonSpiedInstance; - let messageStream: MessageStream; - beforeEach(() => { - timerSandbox = createTimerSandbox(); - clock = sinon.useFakeTimers(); - - driverSocket = sinon.spy(new FakeSocket()); - // @ts-expect-error: driverSocket does not fully satisfy the stream type, but that's okay - connection = sinon.spy(new Connection(driverSocket, connectionOptionsDefaults)); - const messageStreamSymbol = getSymbolFrom(connection, 'messageStream'); - messageStream = sinon.spy(connection[messageStreamSymbol]); - }); - - afterEach(() => { - timerSandbox.restore(); - clock.restore(); - }); - - context('when options.force == true', function () { - it('calls stream.destroy', () => { - connection.destroy({ force: true }); - clock.tick(1); - expect(driverSocket.destroy).to.have.been.calledOnce; - }); - - it('does not call stream.end', () => { - connection.destroy({ force: true }); - clock.tick(1); - expect(driverSocket.end).to.not.have.been.called; - }); - - it('destroys the tcp socket', () => { - connection.destroy({ force: true }); - clock.tick(1); - expect(driverSocket.destroy).to.have.been.calledOnce; - }); - - it('destroys the messageStream', () => { - connection.destroy({ force: true }); - clock.tick(1); - expect(messageStream.destroy).to.have.been.calledOnce; - }); - - it('calls stream.destroy whenever destroy is called ', () => { - connection.destroy({ force: true }); - connection.destroy({ force: true }); - connection.destroy({ force: true }); - clock.tick(1); - expect(driverSocket.destroy).to.have.been.calledThrice; - }); - }); - - context('when options.force == false', function () { - it('calls stream.end', () => { - connection.destroy({ force: false }); - clock.tick(1); - expect(driverSocket.end).to.have.been.calledOnce; - }); - - it('does not call stream.destroy', () => { - connection.destroy({ force: false }); - clock.tick(1); - expect(driverSocket.destroy).to.not.have.been.called; - }); - - it('ends the tcp socket', () => { - connection.destroy({ force: false }); - clock.tick(1); - expect(driverSocket.end).to.have.been.calledOnce; - }); - - it('destroys the messageStream', () => { - connection.destroy({ force: false }); - clock.tick(1); - expect(messageStream.destroy).to.have.been.calledOnce; - }); - - it('calls stream.end exactly once when destroy is called multiple times', () => { - connection.destroy({ force: false }); - connection.destroy({ force: false }); - connection.destroy({ force: false }); - connection.destroy({ force: false }); - clock.tick(1); - expect(driverSocket.end).to.have.been.calledOnce; - }); - }); - }); }); diff --git a/test/unit/error.test.ts b/test/unit/error.test.ts index 0a6b05cd99..d0cd53bb22 100644 --- a/test/unit/error.test.ts +++ b/test/unit/error.test.ts @@ -381,7 +381,7 @@ describe('MongoErrors', () => { makeAndConnectReplSet((err, topology) => { // cleanup the server before calling done - const cleanup = err => topology.close({}, err2 => done(err || err2)); + const cleanup = err => topology.close(err2 => done(err || err2)); if (err) { return cleanup(err); diff --git a/test/unit/sdam/monitor.test.ts b/test/unit/sdam/monitor.test.ts index ecb62a5de9..497183ecb6 100644 --- a/test/unit/sdam/monitor.test.ts +++ b/test/unit/sdam/monitor.test.ts @@ -56,7 +56,7 @@ describe('monitoring', function () { const serverDescription = Array.from(topology.description.servers.values())[0]; expect(serverDescription).property('roundTripTime').to.be.greaterThan(0); - topology.close({}, done as any); + topology.close(done as any); }, 500); }); }).skipReason = 'TODO(NODE-3819): Unskip flaky tests'; @@ -96,7 +96,7 @@ describe('monitoring', function () { const serverDescription = Array.from(topology.description.servers.values())[0]; expect(serverDescription).property('roundTripTime').to.be.greaterThan(0); - topology.close({}, done); + topology.close(done); }); }).skipReason = 'TODO(NODE-3600): Unskip flaky tests'; diff --git a/test/unit/sdam/topology.test.js b/test/unit/sdam/topology.test.js index 68c52b878a..5ca93bd3d4 100644 --- a/test/unit/sdam/topology.test.js +++ b/test/unit/sdam/topology.test.js @@ -26,7 +26,7 @@ describe('Topology (unit)', function () { } if (topology) { - topology.close({}); + topology.close(); } }); @@ -107,7 +107,7 @@ describe('Topology (unit)', function () { topology.connect(() => { expect(topology.shouldCheckForSessionSupport()).to.be.true; - topology.close({}, done); + topology.close(done); }); }); @@ -127,7 +127,7 @@ describe('Topology (unit)', function () { topology.connect(() => { expect(topology.shouldCheckForSessionSupport()).to.be.false; - topology.close({}, done); + topology.close(done); }); }); @@ -147,7 +147,7 @@ describe('Topology (unit)', function () { topology.connect(() => { expect(topology.shouldCheckForSessionSupport()).to.be.false; - topology.close({}, done); + topology.close(done); }); }); }); @@ -182,7 +182,7 @@ describe('Topology (unit)', function () { expect(err).to.exist; expect(err).to.match(/timed out/); - topology.close({}, done); + topology.close(done); }); }); }); @@ -325,7 +325,7 @@ describe('Topology (unit)', function () { expect(err).to.exist; expect(err).to.eql(serverDescription.error); expect(poolCleared).to.be.false; - topology.close({}, done); + topology.close(done); }); }); }); @@ -467,7 +467,7 @@ describe('Topology (unit)', function () { it('should clean up listeners on close', function (done) { topology.s.state = 'connected'; // fake state to test clean up logic - topology.close({}, e => { + topology.close(e => { const srvPollerListeners = topology.s.srvPoller.listeners( SrvPoller.SRV_RECORD_DISCOVERY ); @@ -547,7 +547,7 @@ describe('Topology (unit)', function () { // occurs `requestCheck` will be called for an immediate check. expect(requestCheck).property('callCount').to.equal(1); - topology.close({}, done); + topology.close(done); }); }); }); @@ -559,7 +559,7 @@ describe('Topology (unit)', function () { this.emit('connect'); }); - topology.close({}, () => { + topology.close(() => { topology.selectServer(ReadPreference.primary, { serverSelectionTimeoutMS: 2000 }, err => { expect(err).to.exist; expect(err).to.match(/Topology is closed/);