Skip to content

Commit

Permalink
fix(sessions): move active session tracking to topology base (#1665)
Browse files Browse the repository at this point in the history
Moves the tracking of active sessions to the topology base.
Doing this allows us to ensure that all active and pooled
sessions are ended when the topology closes, and that implicit
sessions are tracked. Also adds a test case to make sure none of
our unit tests are leaking sessions, and corrects many leaky tests.
Also bumps version of mongodb-core

Part of HELP-5384
  • Loading branch information
daprahamian authored Feb 23, 2018
1 parent 7bd5637 commit b1f296f
Show file tree
Hide file tree
Showing 23 changed files with 372 additions and 254 deletions.
3 changes: 2 additions & 1 deletion .eslintrc
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
"mocha": true
},
"globals": {
"Promise": true
"Promise": true,
"Set": true
},
"parserOptions": {
"ecmaVersion": 2017
Expand Down
16 changes: 1 addition & 15 deletions lib/mongo_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -327,14 +327,6 @@ MongoClient.prototype.close = function(force, callback) {
// Remove listeners after emit
self.removeAllListeners('close');

// If we have sessions, we want to send a single `endSessions` command for them,
// and then individually clean them up. They will be removed from the internal state
// when they emit their `ended` events.
if (this.s.sessions.length) {
this.topology.endSessions(this.s.sessions);
this.s.sessions.forEach(session => session.endSession({ skipCommand: true }));
}

// Callback after next event loop tick
if (typeof callback === 'function')
return process.nextTick(function() {
Expand Down Expand Up @@ -507,13 +499,7 @@ MongoClient.prototype.startSession = function(options) {
throw new MongoError('Current topology does not support sessions');
}

const session = this.topology.startSession(options);
session.once('ended', () => {
this.s.sessions = this.s.sessions.filter(s => s.equals(session));
});

this.s.sessions.push(session);
return session;
return this.topology.startSession(options);
};

var mergeOptions = function(target, source, flatten) {
Expand Down
2 changes: 2 additions & 0 deletions lib/topologies/mongos.js
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ class Mongos extends TopologyBase {
options: options,
// Server Session Pool
sessionPool: null,
// Active client sessions
sessions: [],
// Promise library
promiseLibrary: options.promiseLibrary || Promise
};
Expand Down
19 changes: 4 additions & 15 deletions lib/topologies/replset.js
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ class ReplSet extends TopologyBase {
options: options,
// Server Session Pool
sessionPool: null,
// Active client sessions
sessions: [],
// Promise library
promiseLibrary: options.promiseLibrary || Promise
};
Expand Down Expand Up @@ -371,22 +373,9 @@ class ReplSet extends TopologyBase {
}

close(forceClosed) {
var self = this;
// Call destroy on the topology
this.s.coreTopology.destroy({
force: typeof forceClosed === 'boolean' ? forceClosed : false
});

// We need to wash out all stored processes
if (forceClosed === true) {
this.s.storeOptions.force = forceClosed;
this.s.store.flush();
}
super.close(forceClosed);

var events = ['timeout', 'error', 'close', 'joined', 'left'];
events.forEach(function(e) {
self.removeAllListeners(e);
});
['timeout', 'error', 'close', 'joined', 'left'].forEach(e => this.removeAllListeners(e));
}
}

Expand Down
2 changes: 2 additions & 0 deletions lib/topologies/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ class Server extends TopologyBase {
options: options,
// Server Session Pool
sessionPool: null,
// Active client sessions
sessions: [],
// Promise library
promiseLibrary: promiseLibrary || Promise
};
Expand Down
20 changes: 19 additions & 1 deletion lib/topologies/topology_base.js
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,13 @@ class TopologyBase extends EventEmitter {
}

startSession(options) {
return new ClientSession(this, this.s.sessionPool, options);
const session = new ClientSession(this, this.s.sessionPool, options);
session.once('ended', () => {
this.s.sessions = this.s.sessions.filter(s => !s.equals(session));
});

this.s.sessions.push(session);
return session;
}

endSessions(sessions, callback) {
Expand Down Expand Up @@ -388,6 +394,18 @@ class TopologyBase extends EventEmitter {
}

close(forceClosed) {
// If we have sessions, we want to send a single `endSessions` command for them,
// and then individually clean them up. They will be removed from the internal state
// when they emit their `ended` events.
if (this.s.sessions.length) {
this.endSessions(this.s.sessions.map(session => session.id));
this.s.sessions.forEach(session => session.endSession({ skipCommand: true }));
}

if (this.s.sessionPool) {
this.s.sessionPool.endAllPooledSessions();
}

this.s.coreTopology.destroy({
force: typeof forceClosed === 'boolean' ? forceClosed : false
});
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"official"
],
"dependencies": {
"mongodb-core": "3.0.2"
"mongodb-core": "3.0.3"
},
"devDependencies": {
"betterbenchmarks": "^0.1.0",
Expand All @@ -32,6 +32,7 @@
"mongodb-test-runner": "^1.1.18",
"prettier": "^1.5.3",
"semver": "5.4.1",
"sinon": "^4.3.0",
"worker-farm": "^1.5.0"
},
"author": "Christian Kvalheim",
Expand Down
8 changes: 8 additions & 0 deletions test/functional/apm_tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -1036,6 +1036,10 @@ describe('APM', function() {
// Get the result
result = results.successes.shift();

if (result.commandName === 'endSessions') {
result = results.successes.shift();
}

// Validate the test
expect(commandName).to.equal(result.commandName);
// Do we have a getMore command
Expand All @@ -1054,6 +1058,10 @@ describe('APM', function() {
results.failures = filterSessionsCommands(results.failures);
result = results.failures.shift();

if (result.commandName === 'endSessions') {
result = results.failures.shift();
}

// Validate the test
expect(commandName).to.equal(result.commandName);
}
Expand Down
2 changes: 1 addition & 1 deletion test/functional/crud_api_tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,7 @@ describe('CRUD API', function() {
test.equal(null, err);

// Delete all items with no selector
db.collection('t6_1').deleteMany(function(err) {
db.collection('t6_1').deleteMany({}, function(err) {
test.equal(null, err);

client.close();
Expand Down
6 changes: 6 additions & 0 deletions test/functional/crud_spec_tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ describe('CRUD spec', function() {
});
});

afterEach(() => {
if (testContext.client) {
testContext.client.close();
}
});

describe('read', function() {
readScenarios.forEach(function(scenarioData) {
var scenarioName = scenarioData[0];
Expand Down
14 changes: 9 additions & 5 deletions test/functional/cursor_tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -1728,6 +1728,7 @@ describe('Cursor', function() {
test.equal(1, items.length);
test.equal(2, items[0].a);
test.equal(undefined, items[0].x);
client.close();
done();
});
});
Expand Down Expand Up @@ -2296,9 +2297,9 @@ describe('Cursor', function() {

if (count === 0) {
var stream = collection.find({}, { tailable: true, awaitData: true }).stream();

// let index = 0;
stream.on('data', function() {
// console.log("doc :: " + (index++));
// console.log('doc :: ' + index++);
});

stream.on('error', function(err) {
Expand All @@ -2319,14 +2320,17 @@ describe('Cursor', function() {

// Just hammer the server
for (var i = 0; i < 100; i++) {
const id = i;
process.nextTick(function() {
collection.insert({ id: i }, function(err) {
collection.insert({ id }, function(err) {
test.equal(null, err);

if (id === 99) {
setTimeout(() => client.close());
}
});
});
}

setTimeout(() => client.close(), 800);
}
});
}
Expand Down
26 changes: 20 additions & 6 deletions test/functional/cursorstream_tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,16 @@ describe('Cursor Streams', function() {

// When the stream is done
stream.on('end', function() {
expect(data).to.have.length(3000);
client.close();
done();
setTimeout(() => {
let err;
try {
expect(data).to.have.length(3000);
} catch (e) {
err = e;
}
client.close();
done(err);
}, 1000);
});
}
});
Expand Down Expand Up @@ -139,9 +146,16 @@ describe('Cursor Streams', function() {

// When the stream is done
stream.on('end', function() {
expect(data).to.have.length(10000);
client.close();
done();
setTimeout(() => {
let err;
try {
expect(data).to.have.length(10000);
} catch (e) {
err = e;
}
client.close();
done(err);
}, 1000);
});
}
});
Expand Down
3 changes: 3 additions & 0 deletions test/functional/db_tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,13 @@ describe('Db', function() {
coll.findOne({}, null, function() {
//e - errors b/c findOne needs a query selector
test.equal(1, count);
client.close();
done();
});
} catch (e) {
process.nextTick(function() {
test.equal(1, count);
client.close();
done();
});
}
Expand Down Expand Up @@ -465,6 +467,7 @@ describe('Db', function() {
return c.collectionName;
});
test.notEqual(-1, collections.indexOf('node972.test'));
client.close();
done();
});
});
Expand Down
Loading

0 comments on commit b1f296f

Please sign in to comment.