From b1f296ffb435b97320d91703796647b968aa4d4e Mon Sep 17 00:00:00 2001 From: Dan Aprahamian Date: Fri, 23 Feb 2018 15:17:31 -0500 Subject: [PATCH] fix(sessions): move active session tracking to topology base (#1665) Moves the tracking of active sessions to the topology base. Doing this allows us to ensure that all active and pooled sessions are ended when the topology closes, and that implicit sessions are tracked. Also adds a test case to make sure none of our unit tests are leaking sessions, and corrects many leaky tests. Also bumps version of mongodb-core Part of HELP-5384 --- .eslintrc | 3 +- lib/mongo_client.js | 16 +- lib/topologies/mongos.js | 2 + lib/topologies/replset.js | 19 +- lib/topologies/server.js | 2 + lib/topologies/topology_base.js | 20 +- package.json | 3 +- test/functional/apm_tests.js | 8 + test/functional/crud_api_tests.js | 2 +- test/functional/crud_spec_tests.js | 6 + test/functional/cursor_tests.js | 14 +- test/functional/cursorstream_tests.js | 26 +- test/functional/db_tests.js | 3 + test/functional/gridfs_stream_tests.js | 25 ++ test/functional/index_tests.js | 97 ++------ test/functional/insert_tests.js | 13 +- test/functional/operation_example_tests.js | 11 +- .../operation_generators_example_tests.js | 225 ++++++++---------- .../operation_promises_example_tests.js | 15 +- test/functional/session_leak_test.js | 92 +++++++ .../sharding_read_preference_tests.js | 21 +- test/unit/sessions/client_tests.js | 1 + test/unit/sessions/collection_tests.js | 2 + 23 files changed, 372 insertions(+), 254 deletions(-) create mode 100644 test/functional/session_leak_test.js diff --git a/.eslintrc b/.eslintrc index 2440ddd908..283970bdeb 100644 --- a/.eslintrc +++ b/.eslintrc @@ -7,7 +7,8 @@ "mocha": true }, "globals": { - "Promise": true + "Promise": true, + "Set": true }, "parserOptions": { "ecmaVersion": 2017 diff --git a/lib/mongo_client.js b/lib/mongo_client.js index 46813d36e7..04a1637539 100644 --- a/lib/mongo_client.js +++ b/lib/mongo_client.js @@ -327,14 +327,6 @@ MongoClient.prototype.close = function(force, callback) { // Remove listeners after emit self.removeAllListeners('close'); - // If we have sessions, we want to send a single `endSessions` command for them, - // and then individually clean them up. They will be removed from the internal state - // when they emit their `ended` events. - if (this.s.sessions.length) { - this.topology.endSessions(this.s.sessions); - this.s.sessions.forEach(session => session.endSession({ skipCommand: true })); - } - // Callback after next event loop tick if (typeof callback === 'function') return process.nextTick(function() { @@ -507,13 +499,7 @@ MongoClient.prototype.startSession = function(options) { throw new MongoError('Current topology does not support sessions'); } - const session = this.topology.startSession(options); - session.once('ended', () => { - this.s.sessions = this.s.sessions.filter(s => s.equals(session)); - }); - - this.s.sessions.push(session); - return session; + return this.topology.startSession(options); }; var mergeOptions = function(target, source, flatten) { diff --git a/lib/topologies/mongos.js b/lib/topologies/mongos.js index e10168d1c5..b556b36195 100644 --- a/lib/topologies/mongos.js +++ b/lib/topologies/mongos.js @@ -190,6 +190,8 @@ class Mongos extends TopologyBase { options: options, // Server Session Pool sessionPool: null, + // Active client sessions + sessions: [], // Promise library promiseLibrary: options.promiseLibrary || Promise }; diff --git a/lib/topologies/replset.js b/lib/topologies/replset.js index ad46c17df7..63fe02db05 100644 --- a/lib/topologies/replset.js +++ b/lib/topologies/replset.js @@ -206,6 +206,8 @@ class ReplSet extends TopologyBase { options: options, // Server Session Pool sessionPool: null, + // Active client sessions + sessions: [], // Promise library promiseLibrary: options.promiseLibrary || Promise }; @@ -371,22 +373,9 @@ class ReplSet extends TopologyBase { } close(forceClosed) { - var self = this; - // Call destroy on the topology - this.s.coreTopology.destroy({ - force: typeof forceClosed === 'boolean' ? forceClosed : false - }); - - // We need to wash out all stored processes - if (forceClosed === true) { - this.s.storeOptions.force = forceClosed; - this.s.store.flush(); - } + super.close(forceClosed); - var events = ['timeout', 'error', 'close', 'joined', 'left']; - events.forEach(function(e) { - self.removeAllListeners(e); - }); + ['timeout', 'error', 'close', 'joined', 'left'].forEach(e => this.removeAllListeners(e)); } } diff --git a/lib/topologies/server.js b/lib/topologies/server.js index 5823a11961..cbf4c78ebd 100644 --- a/lib/topologies/server.js +++ b/lib/topologies/server.js @@ -198,6 +198,8 @@ class Server extends TopologyBase { options: options, // Server Session Pool sessionPool: null, + // Active client sessions + sessions: [], // Promise library promiseLibrary: promiseLibrary || Promise }; diff --git a/lib/topologies/topology_base.js b/lib/topologies/topology_base.js index a5999b8e87..c1654d1094 100644 --- a/lib/topologies/topology_base.js +++ b/lib/topologies/topology_base.js @@ -290,7 +290,13 @@ class TopologyBase extends EventEmitter { } startSession(options) { - return new ClientSession(this, this.s.sessionPool, options); + const session = new ClientSession(this, this.s.sessionPool, options); + session.once('ended', () => { + this.s.sessions = this.s.sessions.filter(s => !s.equals(session)); + }); + + this.s.sessions.push(session); + return session; } endSessions(sessions, callback) { @@ -388,6 +394,18 @@ class TopologyBase extends EventEmitter { } close(forceClosed) { + // If we have sessions, we want to send a single `endSessions` command for them, + // and then individually clean them up. They will be removed from the internal state + // when they emit their `ended` events. + if (this.s.sessions.length) { + this.endSessions(this.s.sessions.map(session => session.id)); + this.s.sessions.forEach(session => session.endSession({ skipCommand: true })); + } + + if (this.s.sessionPool) { + this.s.sessionPool.endAllPooledSessions(); + } + this.s.coreTopology.destroy({ force: typeof forceClosed === 'boolean' ? forceClosed : false }); diff --git a/package.json b/package.json index 246856de5f..fce01c145a 100644 --- a/package.json +++ b/package.json @@ -13,7 +13,7 @@ "official" ], "dependencies": { - "mongodb-core": "3.0.2" + "mongodb-core": "3.0.3" }, "devDependencies": { "betterbenchmarks": "^0.1.0", @@ -32,6 +32,7 @@ "mongodb-test-runner": "^1.1.18", "prettier": "^1.5.3", "semver": "5.4.1", + "sinon": "^4.3.0", "worker-farm": "^1.5.0" }, "author": "Christian Kvalheim", diff --git a/test/functional/apm_tests.js b/test/functional/apm_tests.js index 41438dfc8e..f53a908d7e 100644 --- a/test/functional/apm_tests.js +++ b/test/functional/apm_tests.js @@ -1036,6 +1036,10 @@ describe('APM', function() { // Get the result result = results.successes.shift(); + if (result.commandName === 'endSessions') { + result = results.successes.shift(); + } + // Validate the test expect(commandName).to.equal(result.commandName); // Do we have a getMore command @@ -1054,6 +1058,10 @@ describe('APM', function() { results.failures = filterSessionsCommands(results.failures); result = results.failures.shift(); + if (result.commandName === 'endSessions') { + result = results.failures.shift(); + } + // Validate the test expect(commandName).to.equal(result.commandName); } diff --git a/test/functional/crud_api_tests.js b/test/functional/crud_api_tests.js index de6d0246c4..de791c83d4 100644 --- a/test/functional/crud_api_tests.js +++ b/test/functional/crud_api_tests.js @@ -830,7 +830,7 @@ describe('CRUD API', function() { test.equal(null, err); // Delete all items with no selector - db.collection('t6_1').deleteMany(function(err) { + db.collection('t6_1').deleteMany({}, function(err) { test.equal(null, err); client.close(); diff --git a/test/functional/crud_spec_tests.js b/test/functional/crud_spec_tests.js index c8c03552b5..28a9154942 100644 --- a/test/functional/crud_spec_tests.js +++ b/test/functional/crud_spec_tests.js @@ -39,6 +39,12 @@ describe('CRUD spec', function() { }); }); + afterEach(() => { + if (testContext.client) { + testContext.client.close(); + } + }); + describe('read', function() { readScenarios.forEach(function(scenarioData) { var scenarioName = scenarioData[0]; diff --git a/test/functional/cursor_tests.js b/test/functional/cursor_tests.js index f660e0e7f8..1b9396d984 100644 --- a/test/functional/cursor_tests.js +++ b/test/functional/cursor_tests.js @@ -1728,6 +1728,7 @@ describe('Cursor', function() { test.equal(1, items.length); test.equal(2, items[0].a); test.equal(undefined, items[0].x); + client.close(); done(); }); }); @@ -2296,9 +2297,9 @@ describe('Cursor', function() { if (count === 0) { var stream = collection.find({}, { tailable: true, awaitData: true }).stream(); - + // let index = 0; stream.on('data', function() { - // console.log("doc :: " + (index++)); + // console.log('doc :: ' + index++); }); stream.on('error', function(err) { @@ -2319,14 +2320,17 @@ describe('Cursor', function() { // Just hammer the server for (var i = 0; i < 100; i++) { + const id = i; process.nextTick(function() { - collection.insert({ id: i }, function(err) { + collection.insert({ id }, function(err) { test.equal(null, err); + + if (id === 99) { + setTimeout(() => client.close()); + } }); }); } - - setTimeout(() => client.close(), 800); } }); } diff --git a/test/functional/cursorstream_tests.js b/test/functional/cursorstream_tests.js index 7bcd6c29e6..3b30e80e9a 100644 --- a/test/functional/cursorstream_tests.js +++ b/test/functional/cursorstream_tests.js @@ -70,9 +70,16 @@ describe('Cursor Streams', function() { // When the stream is done stream.on('end', function() { - expect(data).to.have.length(3000); - client.close(); - done(); + setTimeout(() => { + let err; + try { + expect(data).to.have.length(3000); + } catch (e) { + err = e; + } + client.close(); + done(err); + }, 1000); }); } }); @@ -139,9 +146,16 @@ describe('Cursor Streams', function() { // When the stream is done stream.on('end', function() { - expect(data).to.have.length(10000); - client.close(); - done(); + setTimeout(() => { + let err; + try { + expect(data).to.have.length(10000); + } catch (e) { + err = e; + } + client.close(); + done(err); + }, 1000); }); } }); diff --git a/test/functional/db_tests.js b/test/functional/db_tests.js index e2f967801a..2a118286c7 100644 --- a/test/functional/db_tests.js +++ b/test/functional/db_tests.js @@ -98,11 +98,13 @@ describe('Db', function() { coll.findOne({}, null, function() { //e - errors b/c findOne needs a query selector test.equal(1, count); + client.close(); done(); }); } catch (e) { process.nextTick(function() { test.equal(1, count); + client.close(); done(); }); } @@ -465,6 +467,7 @@ describe('Db', function() { return c.collectionName; }); test.notEqual(-1, collections.indexOf('node972.test')); + client.close(); done(); }); }); diff --git a/test/functional/gridfs_stream_tests.js b/test/functional/gridfs_stream_tests.js index 134292bed7..46de0ce4ce 100644 --- a/test/functional/gridfs_stream_tests.js +++ b/test/functional/gridfs_stream_tests.js @@ -81,6 +81,7 @@ describe('GridFS Stream', function() { test.equal(error, null); test.equal(indexes.length, 2); test.equal(indexes[1].name, 'files_id_1_n_1'); + client.close(); done(); }); }); @@ -166,6 +167,7 @@ describe('GridFS Stream', function() { test.equal(error, null); test.equal(indexes.length, 2); test.equal(indexes[1].name, 'files_id_1_n_1'); + client.close(); done(); }); }); @@ -237,6 +239,7 @@ describe('GridFS Stream', function() { var hash = crypto.createHash('md5'); hash.update(license); test.equal(docs[0].md5, hash.digest('hex')); + client.close(); done(); }); }); @@ -283,6 +286,7 @@ describe('GridFS Stream', function() { downloadStream.on('error', function(err) { test.equal('ENOENT', err.code); + client.close(); client.close(); done(); }); @@ -333,6 +337,7 @@ describe('GridFS Stream', function() { downloadStream.on('end', function() { test.ok(gotData); + client.close(); done(); }); }); @@ -401,6 +406,7 @@ describe('GridFS Stream', function() { // care that we got between 1 and 3, and got the right result test.ok(gotData >= 1 && gotData <= 3); test.equal(str, 'pache'); + client.close(); done(); }); }); @@ -459,6 +465,7 @@ describe('GridFS Stream', function() { test.equal(error, null); test.equal(docs.length, 0); + client.close(); done(); }); }); @@ -521,6 +528,7 @@ describe('GridFS Stream', function() { // Fail if user tries to abort an aborted stream uploadStream.abort().then(null, function(error) { test.equal(error.toString(), 'Error: Cannot call abort() on a stream twice'); + client.close(); done(); }); }); @@ -582,6 +590,7 @@ describe('GridFS Stream', function() { // Fail if user tries to abort an aborted stream uploadStream.abort().then(null, function(error) { test.equal(error.toString(), 'Error: Cannot call abort() on a stream twice'); + client.close(); done(); }); }); @@ -642,6 +651,7 @@ describe('GridFS Stream', function() { downloadStream.on('end', function() { test.equal(downloadStream.s.cursor, null); if (finished.close) { + client.close(); return done(); } finished.end = true; @@ -649,6 +659,7 @@ describe('GridFS Stream', function() { downloadStream.on('close', function() { if (finished.end) { + client.close(); return done(); } finished.close = true; @@ -712,6 +723,7 @@ describe('GridFS Stream', function() { test.equal(error, null); test.equal(docs.length, 0); + client.close(); done(); }); }); @@ -756,6 +768,7 @@ describe('GridFS Stream', function() { sort: { _id: 1 } }); + client.close(); done(); }); // END @@ -811,6 +824,7 @@ describe('GridFS Stream', function() { test.equal(error, null); test.equal(docs.length, 0); + client.close(); done(); }); }); @@ -870,6 +884,7 @@ describe('GridFS Stream', function() { test.equal(error, null); test.equal(docs.length, 0); + client.close(); done(); }); }); @@ -918,6 +933,7 @@ describe('GridFS Stream', function() { bucket.find({}, { batchSize: 1 }).toArray(function(err, files) { test.equal(null, err); test.equal(1, files.length); + client.close(); done(); }); }); @@ -965,6 +981,7 @@ describe('GridFS Stream', function() { // Rename the file bucket.rename(id, 'renamed_it.dat', function(err) { expect(err).to.not.exist; + client.close(); done(); }); }); @@ -1009,6 +1026,7 @@ describe('GridFS Stream', function() { // As per spec, make sure we didn't actually fire a query // because the document length is 0 test.equal(stream.s.cursor, null); + client.close(); done(); }); }); @@ -1059,6 +1077,7 @@ describe('GridFS Stream', function() { } if (--num === 0) { + client.close(); done(); } }); @@ -1105,9 +1124,11 @@ describe('GridFS Stream', function() { download.on('error', function(error) { if (!testSpec.assert.error) { test.ok(false); + client.close(); done(); } test.ok(error.toString().indexOf(testSpec.assert.error) !== -1); + client.close(); done(); }); @@ -1115,10 +1136,12 @@ describe('GridFS Stream', function() { var result = testSpec.assert.result; if (!result) { test.ok(false); + client.close(); done(); } test.equal(res.toString('hex'), result.$hex); + client.close(); done(); }); }; @@ -1282,6 +1305,7 @@ describe('GridFS Stream', function() { // Fail if user tries to abort an aborted stream uploadStream.abort().then(null, function(error) { test.equal(error.toString(), 'Error: Cannot call abort() on a stream twice'); + client.close(); done(); }); }); @@ -1352,6 +1376,7 @@ describe('GridFS Stream', function() { // care that we got between 1 and 3, and got the right result test.ok(gotData >= 1 && gotData <= 3); test.equal(str, 'pache'); + client.close(); done(); }); }); diff --git a/test/functional/index_tests.js b/test/functional/index_tests.js index 6ad31e13ce..2682a33cc5 100644 --- a/test/functional/index_tests.js +++ b/test/functional/index_tests.js @@ -270,68 +270,6 @@ describe('Indexes', function() { } }); - /** - * @ignore - */ - it('shouldThrowErrorOnAttemptingSafeCreateIndexWithNoCallback', { - metadata: { - requires: { topology: ['single', 'replicaset', 'sharded', 'ssl', 'heap', 'wiredtiger'] } - }, - - // The actual test we wish to run - test: function(done) { - var configuration = this.configuration; - var client = configuration.newClient(configuration.writeConcernMax(), { poolSize: 1 }); - client.connect(function(err, client) { - var db = client.db(configuration.db); - db.createCollection('shouldThrowErrorOnAttemptingSafeUpdateWithNoCallback', function( - err, - collection - ) { - try { - // insert a doc - collection.createIndex({ a: 1 }, configuration.writeConcernMax()); - test.ok(false); - } catch (err) {} // eslint-disable-line - - client.close(); - done(); - }); - }); - } - }); - - /** - * @ignore - */ - it('shouldThrowErrorOnAttemptingSafeEnsureIndexWithNoCallback', { - metadata: { - requires: { topology: ['single', 'replicaset', 'sharded', 'ssl', 'heap', 'wiredtiger'] } - }, - - // The actual test we wish to run - test: function(done) { - var configuration = this.configuration; - var client = configuration.newClient(configuration.writeConcernMax(), { poolSize: 1 }); - client.connect(function(err, client) { - var db = client.db(configuration.db); - db.createCollection('shouldThrowErrorOnAttemptingSafeUpdateWithNoCallback', function( - err, - collection - ) { - try { - // insert a doc - collection.ensureIndex({ a: 1 }, configuration.writeConcernMax()); - test.ok(false); - } catch (err) {} // eslint-disable-line - - client.close(); - done(); - }); - }); - } - }); - /** * @ignore */ @@ -782,10 +720,16 @@ describe('Indexes', function() { collection.ensureIndex({ a: 1 }, configuration.writeConcernMax(), function(err) { test.equal(null, err); - collection.dropIndex('a_1'); - - client.close(); - done(); + collection + .dropIndex('a_1') + .then(() => { + client.close(); + done(); + }) + .catch(err => { + client.close(); + done(err); + }); }); }); }); @@ -1109,7 +1053,10 @@ describe('Indexes', function() { * @ignore */ it('should correctly error out due to driver close', { - metadata: { requires: { topology: ['single'] } }, + metadata: { + requires: { topology: ['single'] }, + sessions: { skipLeakTests: true } + }, // The actual test we wish to run test: function(done) { @@ -1118,14 +1065,16 @@ describe('Indexes', function() { client.connect(function(err, client) { var db = client.db(configuration.db); client.close(function() { - db.createCollection('nonexisting', { w: 1 }, function(err) { - test.ok(err != null); - db.collection('nonexisting', { strict: true }, function(err) { + setTimeout(() => { + db.createCollection('nonexisting', { w: 1 }, function(err) { test.ok(err != null); - db.collection('nonexisting', { strict: false }, function(err) { - // When set to false (default) it should not create an error - test.ok(err === null); - done(); + db.collection('nonexisting', { strict: true }, function(err) { + test.ok(err != null); + db.collection('nonexisting', { strict: false }, function(err) { + // When set to false (default) it should not create an error + test.ok(err === null); + setTimeout(() => done()); + }); }); }); }); diff --git a/test/functional/insert_tests.js b/test/functional/insert_tests.js index 183fba4560..e2fa98399f 100644 --- a/test/functional/insert_tests.js +++ b/test/functional/insert_tests.js @@ -1656,9 +1656,16 @@ describe('Insert', function() { client.connect(function(err, client) { var db = client.db(configuration.db); var collection = db.collection('shouldExecuteInsertWithNoCallbackAndWriteConcern'); - collection.insert({ a: { b: { c: 1 } } }); - client.close(); - done(); + collection.insert({ a: { b: { c: 1 } } }).then( + () => { + client.close(); + done(); + }, + err => { + client.close(); + done(err); + } + ); }); } }); diff --git a/test/functional/operation_example_tests.js b/test/functional/operation_example_tests.js index 903e12e590..2efb1a0aed 100644 --- a/test/functional/operation_example_tests.js +++ b/test/functional/operation_example_tests.js @@ -3170,7 +3170,8 @@ describe('Operation Examples', function() { */ it('shouldCorrectlyRenameCollection', { metadata: { - requires: { topology: ['single', 'replicaset', 'sharded', 'ssl', 'heap', 'wiredtiger'] } + requires: { topology: ['single', 'replicaset', 'sharded', 'ssl', 'heap', 'wiredtiger'] }, + sessions: { skipLeakTests: true } }, // The actual test we wish to run @@ -4352,12 +4353,14 @@ describe('Operation Examples', function() { test.ok(result); test.equal(null, err); + const oldClient = client; // Authenticate MongoClient.connect( 'mongodb://user:name@localhost:27017/integration_tests', function(err, client) { expect(err).to.exist; expect(client).to.not.exist; + oldClient.close(); done(); } ); @@ -8970,8 +8973,10 @@ describe('Operation Examples', function() { // When the stream is done stream.on('end', function() { - client.close(); - done(); + setTimeout(() => { + client.close(); + done(); + }, 1000); }); }); }); diff --git a/test/functional/operation_generators_example_tests.js b/test/functional/operation_generators_example_tests.js index ddd2d5ea67..2ca94f17cc 100644 --- a/test/functional/operation_generators_example_tests.js +++ b/test/functional/operation_generators_example_tests.js @@ -1620,15 +1620,11 @@ describe('Operation (Generators)', function() { // Create a simple single field index yield collection.ensureIndex({ a: 1 }, configuration.writeConcernMax()); - setTimeout(function() { - return co(function*() { - // List all of the indexes on the collection - var indexes = yield collection.indexes(); - test.equal(3, indexes.length); + // List all of the indexes on the collection + var indexes = yield collection.indexes(); + test.equal(3, indexes.length); - client.close(); - }); - }, 1000); + client.close(); }); // END } @@ -1844,17 +1840,11 @@ describe('Operation (Generators)', function() { // BEGIN var collection = db.collection('simple_document_insert_collection_no_safe_with_generators'); // Insert a single document - collection.insertOne({ hello: 'world_no_safe' }); - - // Wait for a second before finishing up, to ensure we have written the item to disk - setTimeout(function() { - return co(function*() { - // Fetch the document - var item = yield collection.findOne({ hello: 'world_no_safe' }); - test.equal('world_no_safe', item.hello); - client.close(); - }); - }, 100); + yield collection.insertOne({ hello: 'world_no_safe' }); + + var item = yield collection.findOne({ hello: 'world_no_safe' }); + test.equal('world_no_safe', item.hello); + client.close(); }); // END } @@ -2346,7 +2336,10 @@ describe('Operation (Generators)', function() { * @ignore */ it('shouldCorrectlyRenameCollectionWithGenerators', { - metadata: { requires: { generators: true, topology: ['single'] } }, + metadata: { + requires: { generators: true, topology: ['single'] }, + sessions: { skipLeakTests: true } + }, // The actual test we wish to run test: function() { @@ -2477,17 +2470,12 @@ describe('Operation (Generators)', function() { // Fetch the collection var collection = db.collection('save_a_simple_document_with_generators'); // Save a document with no safe option - collection.save({ hello: 'world' }); - - // Wait for a second - setTimeout(function() { - return co(function*() { - // Find the saved document - var item = yield collection.findOne({ hello: 'world' }); - test.equal('world', item.hello); - client.close(); - }); - }, 2000); + yield collection.save({ hello: 'world' }); + + // Find the saved document + var item = yield collection.findOne({ hello: 'world' }); + test.equal('world', item && item.hello); + client.close(); }); // END } @@ -2591,19 +2579,14 @@ describe('Operation (Generators)', function() { yield collection.insertOne({ a: 1 }, configuration.writeConcernMax()); // Update the document with an atomic operator - collection.updateOne({ a: 1 }, { $set: { b: 2 } }); + yield collection.updateOne({ a: 1 }, { $set: { b: 2 } }); - // Wait for a second then fetch the document - setTimeout(function() { - return co(function*() { - // Fetch the document that we modified - var item = yield collection.findOne({ a: 1 }); - test.equal(1, item.a); - test.equal(2, item.b); + var item = yield collection.findOne({ a: 1 }); - client.close(); - }); - }, 1000); + test.equal(1, item.a); + test.equal(2, item.b); + + client.close(); }); // END } @@ -2947,54 +2930,52 @@ describe('Operation (Generators)', function() { readPreference: ReadPreference.PRIMARY }); - setTimeout(function() { - return co(function*() { - // Locate the entry - var collection = db.collection('test_eval_with_generators'); - var item = yield collection.findOne(); - test.equal(5, item.y); - tests_done(); - - // Evaluate a function with 2 parameters passed in - var result = yield db.eval('function (x, y) {return x + y;}', [2, 3]); - test.equal(5, result); - tests_done(); - - // Evaluate a function with no parameters passed in - result = yield db.eval('function () {return 5;}'); - test.equal(5, result); - tests_done(); - - // Evaluate a statement - result = yield db.eval('2 + 3;'); - test.equal(5, result); - tests_done(); - - // Evaluate a statement using the code object - result = yield db.eval(new Code('2 + 3;')); - test.equal(5, result); - tests_done(); - - // Evaluate a statement using the code object including a scope - result = yield db.eval(new Code('return i;', { i: 2 })); - test.equal(2, result); - tests_done(); - - // Evaluate a statement using the code object including a scope - result = yield db.eval(new Code('i + 3;', { i: 2 })); - test.equal(5, result); - tests_done(); - - try { - // Evaluate an illegal statement - yield db.eval('5 ++ 5;'); - } catch (err) { - test.ok(err instanceof Error); - test.ok(err.message != null); - tests_done(); - } - }); - }, 1000); + yield new Promise(resolve => setTimeout(resolve, 1000)); + + // Locate the entry + var collection = db.collection('test_eval_with_generators'); + var item = yield collection.findOne(); + test.equal(5, item.y); + tests_done(); + + // Evaluate a function with 2 parameters passed in + result = yield db.eval('function (x, y) {return x + y;}', [2, 3]); + test.equal(5, result); + tests_done(); + + // Evaluate a function with no parameters passed in + result = yield db.eval('function () {return 5;}'); + test.equal(5, result); + tests_done(); + + // Evaluate a statement + result = yield db.eval('2 + 3;'); + test.equal(5, result); + tests_done(); + + // Evaluate a statement using the code object + result = yield db.eval(new Code('2 + 3;')); + test.equal(5, result); + tests_done(); + + // Evaluate a statement using the code object including a scope + result = yield db.eval(new Code('return i;', { i: 2 })); + test.equal(2, result); + tests_done(); + + // Evaluate a statement using the code object including a scope + result = yield db.eval(new Code('i + 3;', { i: 2 })); + test.equal(5, result); + tests_done(); + + try { + // Evaluate an illegal statement + yield db.eval('5 ++ 5;'); + } catch (err) { + test.ok(err instanceof Error); + test.ok(err.message != null); + tests_done(); + } }); // END } @@ -3621,26 +3602,23 @@ describe('Operation (Generators)', function() { yield db.dropDatabase(); // Wait two seconds to let it replicate across - setTimeout(function() { - return co(function*() { - // Get the admin database - var dbs = yield db.admin().listDatabases(); - // Grab the databases - dbs = dbs.databases; - // Did we find the db - var found = false; - - // Check if we have the db in the list - for (var i = 0; i < dbs.length; i++) { - if (dbs[i].name === 'integration_tests_to_drop') found = true; - } + yield new Promise(resolve => setTimeout(resolve, 2000)); + // Get the admin database + var dbs = yield db.admin().listDatabases(); + // Grab the databases + dbs = dbs.databases; + // Did we find the db + var found = false; + + // Check if we have the db in the list + for (var i = 0; i < dbs.length; i++) { + if (dbs[i].name === 'integration_tests_to_drop') found = true; + } - // We should not find the databases - if (process.env['JENKINS'] == null) test.equal(false, found); + // We should not find the databases + if (process.env['JENKINS'] == null) test.equal(false, found); - client.close(); - }); - }, 2000); + client.close(); }); // END } @@ -6560,24 +6538,27 @@ describe('Operation (Generators)', function() { // Insert a document in the capped collection yield collection.insertMany(docs, configuration.writeConcernMax()); - var total = 0; - // Get the cursor - var cursor = collection - .find({}) - .addCursorFlag('tailable', true) - .addCursorFlag('awaitData', true); + yield new Promise(resolve => { + var total = 0; + // Get the cursor + var cursor = collection + .find({}) + .addCursorFlag('tailable', true) + .addCursorFlag('awaitData', true); - cursor.on('data', function() { - total = total + 1; + cursor.on('data', function() { + total = total + 1; - if (total === 1000) { - cursor.kill(); - } - }); + if (total === 1000) { + cursor.kill(); + } + }); - cursor.on('end', function() { - client.close(); + cursor.on('end', function() { + client.close(); + resolve(); + }); }); }); // END diff --git a/test/functional/operation_promises_example_tests.js b/test/functional/operation_promises_example_tests.js index d3d1ed584c..824469a473 100644 --- a/test/functional/operation_promises_example_tests.js +++ b/test/functional/operation_promises_example_tests.js @@ -2431,7 +2431,10 @@ describe('Operation (Promises)', function() { * @ignore */ it('shouldCorrectlyRenameCollectionWithPromises', { - metadata: { requires: { promises: true, topology: ['single'] } }, + metadata: { + requires: { promises: true, topology: ['single'] }, + sessions: { skipLeakTests: true } + }, // The actual test we wish to run test: function() { @@ -2538,8 +2541,14 @@ describe('Operation (Promises)', function() { }) .then(function(count) { test.equal(2, count); - client.close(); - }); + }) + .then( + () => client.close(), + e => { + client.close(); + throw e; + } + ); }); // END /* eslint-enable */ diff --git a/test/functional/session_leak_test.js b/test/functional/session_leak_test.js new file mode 100644 index 0000000000..554737e3c9 --- /dev/null +++ b/test/functional/session_leak_test.js @@ -0,0 +1,92 @@ +'use strict'; + +const expect = require('chai').expect; +const sinon = require('sinon'); +const core = require('mongodb-core'); +const MongoClient = require('../../lib/mongo_client'); +const ServerSessionPool = core.Sessions.ServerSessionPool; + +const sandbox = sinon.createSandbox(); + +let activeSessions, pooledSessions, activeSessionsBeforeClose; + +function getSessionLeakMetadata(currentTest) { + return (currentTest.metadata && currentTest.metadata.sessions) || {}; +} + +beforeEach('Session Leak Before Each - Set up clean test environment', () => { + sandbox.restore(); + activeSessions = new Set(); + pooledSessions = new Set(); + activeSessionsBeforeClose = new Set(); +}); + +beforeEach('Session Leak Before Each - setup session tracking', function() { + if (getSessionLeakMetadata(this.currentTest).skipLeakTests) { + return; + } + + const _acquire = ServerSessionPool.prototype.acquire; + sandbox.stub(ServerSessionPool.prototype, 'acquire').callsFake(function() { + const session = _acquire.apply(this, arguments); + activeSessions.add(session.id); + // console.log(`Active + ${JSON.stringify(session.id)} = ${activeSessions.size}`); + return session; + }); + + const _release = ServerSessionPool.prototype.release; + sandbox.stub(ServerSessionPool.prototype, 'release').callsFake(function(session) { + const id = session.id; + activeSessions.delete(id); + // console.log(`Active - ${JSON.stringify(id)} = ${activeSessions.size}`); + pooledSessions.add(id); + // console.log(`Pooled + ${JSON.stringify(id)} = ${activeSessions.size}`); + return _release.apply(this, arguments); + }); + + [core.Server, core.ReplSet, core.Mongos].forEach(topology => { + const _endSessions = topology.prototype.endSessions; + sandbox.stub(topology.prototype, 'endSessions').callsFake(function(sessions) { + sessions = Array.isArray(sessions) ? sessions : [sessions]; + + sessions.forEach(id => pooledSessions.delete(id)); + + return _endSessions.apply(this, arguments); + }); + }); + + const _close = MongoClient.prototype.close; + sandbox.stub(MongoClient.prototype, 'close').callsFake(function() { + activeSessionsBeforeClose = new Set(activeSessions); + + return _close.apply(this, arguments); + }); +}); + +afterEach('Session Leak After Each - ensure no leaks', function() { + if ( + this.currentTest.state === 'failed' || + getSessionLeakMetadata(this.currentTest).skipLeakTests + ) { + return; + } + + try { + expect( + activeSessionsBeforeClose.size, + `test is leaking ${activeSessionsBeforeClose.size} active sessions while running client` + ).to.equal(0); + + expect( + activeSessions.size, + `client close failed to clean up ${activeSessions.size} active sessions` + ).to.equal(0); + + expect( + pooledSessions.size, + `client close failed to clean up ${pooledSessions.size} pooled sessions` + ).to.equal(0); + } catch (e) { + this.test.error(e); + } +}); diff --git a/test/functional/sharding_read_preference_tests.js b/test/functional/sharding_read_preference_tests.js index 6e116ac195..6e2dc6d179 100644 --- a/test/functional/sharding_read_preference_tests.js +++ b/test/functional/sharding_read_preference_tests.js @@ -42,6 +42,8 @@ describe('Sharding (Read Preference)', function() { // Set debug level for the driver Logger.setLevel('debug'); + let gotMessage = false; + // Get the current logger Logger.setCurrentLogger(function(message, options) { if ( @@ -49,7 +51,7 @@ describe('Sharding (Read Preference)', function() { options.className === 'Cursor' && options.message.indexOf('"mode":"secondary"') !== -1 ) { - done(); + gotMessage = true; } }); @@ -59,11 +61,13 @@ describe('Sharding (Read Preference)', function() { function(err, item) { test.equal(null, err); test.equal(1, item.test); + test.ok(gotMessage); // Set error level for the driver Logger.setLevel('error'); // Close db connection client.close(); + done(); } ); }); @@ -106,6 +110,7 @@ describe('Sharding (Read Preference)', function() { // Set debug level for the driver Logger.setLevel('debug'); + let gotMessage = false; // Get the current logger Logger.setCurrentLogger(function(message, options) { if ( @@ -113,7 +118,7 @@ describe('Sharding (Read Preference)', function() { options.className === 'Cursor' && options.message.indexOf('"mode":"notsupported"') !== -1 ) { - done(); + gotMessage = true; } }); @@ -122,11 +127,13 @@ describe('Sharding (Read Preference)', function() { { readPreference: new ReadPreference('notsupported') }, function(err) { test.ok(err != null); + test.ok(gotMessage); // Set error level for the driver Logger.setLevel('error'); // Close db connection client.close(); + done(); } ); }); @@ -169,6 +176,7 @@ describe('Sharding (Read Preference)', function() { // Set debug level for the driver Logger.setLevel('debug'); + let gotMessage = false; // Get the current logger Logger.setCurrentLogger(function(message, options) { if ( @@ -178,7 +186,7 @@ describe('Sharding (Read Preference)', function() { '{"mode":"secondary","tags":[{"dc":"sf","s":"1"},{"dc":"ma","s":"2"}]}' ) !== -1 ) { - done(); + gotMessage = true; } }); @@ -192,10 +200,12 @@ describe('Sharding (Read Preference)', function() { }, function(err) { test.ok(err != null); + test.ok(gotMessage); // Set error level for the driver Logger.setLevel('error'); // Close db connection client.close(); + done(); } ); }); @@ -238,6 +248,7 @@ describe('Sharding (Read Preference)', function() { // Set debug level for the driver Logger.setLevel('debug'); + let gotMessage = false; // Get the current logger Logger.setCurrentLogger(function(message, options) { if ( @@ -246,7 +257,7 @@ describe('Sharding (Read Preference)', function() { options.message.indexOf('{"mode":"secondary","tags":[{"loc":"ny"},{"loc":"sf"}]}') !== -1 ) { - done(); + gotMessage = true; } }); @@ -261,10 +272,12 @@ describe('Sharding (Read Preference)', function() { function(err, item) { test.equal(null, err); test.equal(1, item.test); + test.ok(gotMessage); // Set error level for the driver Logger.setLevel('error'); // Close db connection client.close(); + done(); } ); }); diff --git a/test/unit/sessions/client_tests.js b/test/unit/sessions/client_tests.js index 00e371eaa2..99d9b876a3 100644 --- a/test/unit/sessions/client_tests.js +++ b/test/unit/sessions/client_tests.js @@ -60,6 +60,7 @@ describe('Sessions', function() { let session = client.startSession(); expect(session).to.exist; + session.endSession({ skipCommand: true }); client.close(); done(); }); diff --git a/test/unit/sessions/collection_tests.js b/test/unit/sessions/collection_tests.js index 298e13822d..ebcffddebe 100644 --- a/test/unit/sessions/collection_tests.js +++ b/test/unit/sessions/collection_tests.js @@ -44,6 +44,8 @@ describe('Sessions', function() { .then(() => { expect(findCommand.readConcern).to.have.keys(['level', 'afterClusterTime']); expect(findCommand.readConcern.afterClusterTime).to.eql(insertOperationTime); + + session.endSession({ skipCommand: true }); return client.close(); }); });