diff --git a/lib/core/connection/pool.js b/lib/core/connection/pool.js index a020840a4b..a35e10c9a5 100644 --- a/lib/core/connection/pool.js +++ b/lib/core/connection/pool.js @@ -636,6 +636,7 @@ function destroy(self, connections, options, callback) { */ Pool.prototype.destroy = function(force, callback) { var self = this; + // Do not try again if the pool is already dead if (this.state === DESTROYED || self.state === DESTROYING) { if (typeof callback === 'function') callback(null, null); @@ -958,15 +959,6 @@ function createConnection(pool, callback) { pool.logger.debug(`connection attempt failed with error [${JSON.stringify(err)}]`); } - if (pool.options.legacyCompatMode === false) { - // The unified topology uses the reported `error` from a pool to track what error - // reason is returned to the user during selection timeout. We only want to emit - // this if the pool is active because the listeners are removed on destruction. - if (pool.state !== DESTROYED && pool.state !== DESTROYING) { - pool.emit('error', err); - } - } - // check if reconnect is enabled, and attempt retry if so if (!pool.reconnectId && pool.options.reconnect) { if (pool.state === CONNECTING && pool.options.legacyCompatMode) { @@ -1044,6 +1036,7 @@ function _execute(self) { // operations if (self.connectingConnections > 0) { self.executing = false; + setTimeout(() => _execute(self)(), 10); return; } diff --git a/lib/core/sdam/server.js b/lib/core/sdam/server.js index 19f94cd84b..32767a2f5b 100644 --- a/lib/core/sdam/server.js +++ b/lib/core/sdam/server.js @@ -148,16 +148,13 @@ class Server extends EventEmitter { { bson: this.s.bson } ); - // NOTE: this should only be the case if we are connecting to a single server - poolOptions.reconnect = true; + // NOTE: reconnect is explicitly false because of the server selection loop + poolOptions.reconnect = false; poolOptions.legacyCompatMode = false; this.s.pool = new Pool(this, poolOptions); // setup listeners - this.s.pool.on('connect', connectEventHandler(this)); - this.s.pool.on('close', errorEventHandler(this)); - this.s.pool.on('error', errorEventHandler(this)); this.s.pool.on('parseError', parseErrorEventHandler(this)); // it is unclear whether consumers should even know about these events @@ -169,14 +166,7 @@ class Server extends EventEmitter { relayEvents(this.s.pool, this, ['commandStarted', 'commandSucceeded', 'commandFailed']); stateTransition(this, STATE_CONNECTING); - - // If auth settings have been provided, use them - if (options.auth) { - this.s.pool.connect.apply(this.s.pool, options.auth); - return; - } - - this.s.pool.connect(); + this.s.pool.connect(connectEventHandler(this)); } /** @@ -474,7 +464,13 @@ function executeWriteOperation(args, options, callback) { } function connectEventHandler(server) { - return function(pool, conn) { + return function(err, conn) { + if (err) { + server.emit('error', new MongoNetworkError(err)); + server.emit('close'); + return; + } + const ismaster = conn.ismaster; server.s.lastIsMasterMS = conn.lastIsMasterMS; if (conn.agreedCompressor) { @@ -506,16 +502,6 @@ function connectEventHandler(server) { }; } -function errorEventHandler(server) { - return function(err) { - if (err) { - server.emit('error', new MongoNetworkError(err)); - } - - server.emit('close'); - }; -} - function parseErrorEventHandler(server) { return function(err) { stateTransition(this, STATE_CLOSED); diff --git a/lib/core/sdam/topology.js b/lib/core/sdam/topology.js index e8f1dc93dd..343db2e707 100644 --- a/lib/core/sdam/topology.js +++ b/lib/core/sdam/topology.js @@ -894,15 +894,6 @@ function selectServers(topology, selector, timeout, start, callback) { topology.s.monitorTimers.push(timer); }); - const descriptionChangedHandler = () => { - // successful iteration, clear the check timer - clearTimeout(iterationTimer); - topology.s.iterationTimers.splice(timerIndex, 1); - - // topology description has changed due to monitoring, reattempt server selection - selectServers(topology, selector, timeout, start, callback); - }; - const iterationTimer = setTimeout(() => { topology.removeListener('topologyDescriptionChanged', descriptionChangedHandler); callback( @@ -913,8 +904,17 @@ function selectServers(topology, selector, timeout, start, callback) { ); }, timeout - duration); + const descriptionChangedHandler = () => { + // successful iteration, clear the check timer + removeTimerFrom(iterationTimer, topology.s.iterationTimers); + clearTimeout(iterationTimer); + + // topology description has changed due to monitoring, reattempt server selection + selectServers(topology, selector, timeout, start, callback); + }; + // track this timer in case we need to clean it up outside this loop - const timerIndex = topology.s.iterationTimers.push(iterationTimer); + topology.s.iterationTimers.push(iterationTimer); topology.once('topologyDescriptionChanged', descriptionChangedHandler); }; @@ -922,7 +922,7 @@ function selectServers(topology, selector, timeout, start, callback) { retrySelection(); } -function createAndConnectServer(topology, serverDescription) { +function createAndConnectServer(topology, serverDescription, connectDelay) { topology.emit( 'serverOpening', new monitoring.ServerOpeningEvent(topology.s.id, serverDescription.address) @@ -934,10 +934,45 @@ function createAndConnectServer(topology, serverDescription) { server.once('connect', serverConnectEventHandler(server, topology)); server.on('descriptionReceived', topology.serverUpdateHandler.bind(topology)); server.on('error', serverErrorEventHandler(server, topology)); + + if (connectDelay) { + const connectTimer = setTimeout(() => { + removeTimerFrom(connectTimer, topology.s.iterationTimers); + server.connect(); + }, connectDelay); + + topology.s.iterationTimers.push(connectTimer); + return server; + } + server.connect(); return server; } +function resetServer(topology, serverDescription) { + if (!topology.s.servers.has(serverDescription.address)) { + return; + } + + // first remove the old server + const server = topology.s.servers.get(serverDescription.address); + destroyServer(server, topology); + + // add the new server, and attempt connection after a delay + const newServer = createAndConnectServer( + topology, + serverDescription, + topology.s.heartbeatFrequencyMS + ); + + topology.s.servers.set(serverDescription.address, newServer); +} + +function removeTimerFrom(timer, timers) { + const idx = timers.findIndex(t => t === timer); + timers.splice(idx, 1); +} + /** * Create `Server` instances for all initially known servers, connect them, and assign * them to the passed in `Topology`. @@ -954,6 +989,15 @@ function connectServers(topology, serverDescriptions) { } function updateServers(topology, incomingServerDescription) { + // if the server was reset internally because of an error, we need to replace the + // `Server` instance for it so we can attempt reconnect. + // + // TODO: this logical can change once CMAP is put in place + if (incomingServerDescription && incomingServerDescription.error) { + resetServer(topology, incomingServerDescription); + return; + } + // update the internal server's description if (incomingServerDescription && topology.s.servers.has(incomingServerDescription.address)) { const server = topology.s.servers.get(incomingServerDescription.address);