Skip to content

Commit

Permalink
PYTHON-2328 Reset the connection pool in Topology.on_change (#470)
Browse files Browse the repository at this point in the history
PYTHON-2304 Ensure _RttMonitor closes socket on when the client is closed
  • Loading branch information
ShaneHarvey authored Jul 27, 2020
1 parent c16b5b9 commit b04e334
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 18 deletions.
16 changes: 9 additions & 7 deletions pymongo/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,20 +177,20 @@ def _run(self):
# discover that we've been cancelled.
self._executor.skip_sleep()
return
self._topology.on_change(self._server_description)

# Update the Topology and clear the server pool on error.
self._topology.on_change(self._server_description,
reset_pool=self._server_description.error)

if (self._server_description.is_server_type_known and
self._server_description.topology_version):
self._start_rtt_monitor()
# Immediately check for the next streaming response.
self._executor.skip_sleep()

if self._server_description.error:
# Reset the server pool only after marking the server Unknown.
self._topology.reset_pool(self._server_description.address)
if prev_sd.is_server_type_known:
# Immediately retry on network errors.
self._executor.skip_sleep()
if self._server_description.error and prev_sd.is_server_type_known:
# Immediately retry on network errors.
self._executor.skip_sleep()
except ReferenceError:
# Topology was garbage-collected.
self.close()
Expand Down Expand Up @@ -377,6 +377,8 @@ def _run(self):
def _ping(self):
"""Run an "isMaster" command and return the RTT."""
with self._pool.get_socket({}) as sock_info:
if self._executor._stopped:
raise Exception('_RttMonitor closed')
start = _time()
sock_info.ismaster()
return _time() - start
Expand Down
22 changes: 11 additions & 11 deletions pymongo/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ def select_server_by_address(self, address,
server_selection_timeout,
address)

def _process_change(self, server_description):
def _process_change(self, server_description, reset_pool=False):
"""Process a new ServerDescription on an opened topology.
Hold the lock when calling this.
Expand Down Expand Up @@ -303,10 +303,16 @@ def _process_change(self, server_description):
SRV_POLLING_TOPOLOGIES):
self._srv_monitor.close()

# Clear the pool from a failed heartbeat.
if reset_pool:
server = self._servers.get(server_description.address)
if server:
server.pool.reset()

# Wake waiters in select_servers().
self._condition.notify_all()

def on_change(self, server_description):
def on_change(self, server_description, reset_pool=False):
"""Process a new ServerDescription after an ismaster call completes."""
# We do no I/O holding the lock.
with self._lock:
Expand All @@ -320,7 +326,7 @@ def on_change(self, server_description):
# that didn't include this server.
if (self._opened and
self._description.has_server(server_description.address)):
self._process_change(server_description)
self._process_change(server_description, reset_pool)

def _process_srv_update(self, seedlist):
"""Process a new seedlist on an opened topology.
Expand Down Expand Up @@ -414,20 +420,14 @@ def request_check_all(self, wait_time=5):
self._request_check_all()
self._condition.wait(wait_time)

def reset_pool(self, address):
with self._lock:
server = self._servers.get(address)
if server:
server.pool.reset()

def handle_getlasterror(self, address, error_msg):
"""Clear our pool for a server, mark it Unknown, and check it soon."""
error = NotMasterError(error_msg, {'code': 10107, 'errmsg': error_msg})
with self._lock:
server = self._servers.get(address)
if server:
self._process_change(ServerDescription(address, error=error))
server.pool.reset()
self._process_change(
ServerDescription(address, error=error), True)
server.request_check()

def update_pool(self, all_credentials):
Expand Down

0 comments on commit b04e334

Please sign in to comment.