diff --git a/pymongo/monitor.py b/pymongo/monitor.py index 3d8d167b96..81502591bf 100644 --- a/pymongo/monitor.py +++ b/pymongo/monitor.py @@ -177,7 +177,10 @@ def _run(self): # discover that we've been cancelled. self._executor.skip_sleep() return - self._topology.on_change(self._server_description) + + # Update the Topology and clear the server pool on error. + self._topology.on_change(self._server_description, + reset_pool=self._server_description.error) if (self._server_description.is_server_type_known and self._server_description.topology_version): @@ -185,12 +188,9 @@ def _run(self): # Immediately check for the next streaming response. self._executor.skip_sleep() - if self._server_description.error: - # Reset the server pool only after marking the server Unknown. - self._topology.reset_pool(self._server_description.address) - if prev_sd.is_server_type_known: - # Immediately retry on network errors. - self._executor.skip_sleep() + if self._server_description.error and prev_sd.is_server_type_known: + # Immediately retry on network errors. + self._executor.skip_sleep() except ReferenceError: # Topology was garbage-collected. self.close() @@ -377,6 +377,8 @@ def _run(self): def _ping(self): """Run an "isMaster" command and return the RTT.""" with self._pool.get_socket({}) as sock_info: + if self._executor._stopped: + raise Exception('_RttMonitor closed') start = _time() sock_info.ismaster() return _time() - start diff --git a/pymongo/topology.py b/pymongo/topology.py index 749299bb74..eb84a344e0 100644 --- a/pymongo/topology.py +++ b/pymongo/topology.py @@ -265,7 +265,7 @@ def select_server_by_address(self, address, server_selection_timeout, address) - def _process_change(self, server_description): + def _process_change(self, server_description, reset_pool=False): """Process a new ServerDescription on an opened topology. Hold the lock when calling this. @@ -303,10 +303,16 @@ def _process_change(self, server_description): SRV_POLLING_TOPOLOGIES): self._srv_monitor.close() + # Clear the pool from a failed heartbeat. + if reset_pool: + server = self._servers.get(server_description.address) + if server: + server.pool.reset() + # Wake waiters in select_servers(). self._condition.notify_all() - def on_change(self, server_description): + def on_change(self, server_description, reset_pool=False): """Process a new ServerDescription after an ismaster call completes.""" # We do no I/O holding the lock. with self._lock: @@ -320,7 +326,7 @@ def on_change(self, server_description): # that didn't include this server. if (self._opened and self._description.has_server(server_description.address)): - self._process_change(server_description) + self._process_change(server_description, reset_pool) def _process_srv_update(self, seedlist): """Process a new seedlist on an opened topology. @@ -414,20 +420,14 @@ def request_check_all(self, wait_time=5): self._request_check_all() self._condition.wait(wait_time) - def reset_pool(self, address): - with self._lock: - server = self._servers.get(address) - if server: - server.pool.reset() - def handle_getlasterror(self, address, error_msg): """Clear our pool for a server, mark it Unknown, and check it soon.""" error = NotMasterError(error_msg, {'code': 10107, 'errmsg': error_msg}) with self._lock: server = self._servers.get(address) if server: - self._process_change(ServerDescription(address, error=error)) - server.pool.reset() + self._process_change( + ServerDescription(address, error=error), True) server.request_check() def update_pool(self, all_credentials):