Skip to content

Commit

Permalink
Disconnect clients from pool safely
Browse files Browse the repository at this point in the history
This PR tries to solve the issues raised by redis#732 regarding
the danger of disconnect clients from the `ConnectionPool.disconnect`
method executed by a Thread different that those ones that are in charge
of the connections.

Instead of call the `Connection.disconnect` method it uses the syscall
`shutdown` to leave the socket unusable. Once the connection tries to use
the socket, even when it is already blocked such us the `PubSub` pattern, it
gets a `socket.error` exception that will be cactched by the
`Connection` class to then raise an `ConnectionError` and disconnect the
socket in a clean and safe way.

The `Client.execute_command` function catches the `ConnectionError` exception
and tries to connect again and run the command that raised the error.
Worth mentioning that in the case of the `Sentinel` environment, if some
changes regarding the Redis pool of servers - perhaps the mater went
down and a slave was promoted - the next command will be executed using
an other server.
  • Loading branch information
pfreixes committed Jun 19, 2016
1 parent 20fc04e commit 92dfcc5
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 2 deletions.
18 changes: 16 additions & 2 deletions redis/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,20 @@ def disconnect(self):
pass
self._sock = None

def shutdown_socket(self):
"""
Shutdown the socket hold by the current connection, called from
the connection pool class u other manager to singal it that has to be
disconnected in a thread safe way. Later the connection instance
will get an error and will call `disconnect` by it self.
"""
try:
self._sock.shutdown(socket.SHUT_RDWR)
except AttributeError:
# either _sock attribute does not exist or
# connection thread removed it.
pass

def send_packed_command(self, command):
"Send an already packed command to the Redis server"
if not self._sock:
Expand Down Expand Up @@ -953,7 +967,7 @@ def disconnect(self):
all_conns = chain(self._available_connections,
self._in_use_connections)
for connection in all_conns:
connection.disconnect()
connection.shutdown_socket()


class BlockingConnectionPool(ConnectionPool):
Expand Down Expand Up @@ -1072,4 +1086,4 @@ def release(self, connection):
def disconnect(self):
"Disconnects all connections in the pool."
for connection in self._connections:
connection.disconnect()
connection.shutdown_socket()
26 changes: 26 additions & 0 deletions tests/test_connection_pool.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from __future__ import with_statement
from mock import Mock

import os
import pytest
import redis
Expand Down Expand Up @@ -69,6 +71,30 @@ def test_repr_contains_db_info_unix(self):
expected = 'ConnectionPool<UnixDomainSocketConnection<path=/abc,db=1>>'
assert repr(pool) == expected

def test_disconnect_active_connections(self):

class MyConnection(redis.Connection):

connect_calls = 0

def __init__(self, *args, **kwargs):
super(MyConnection, self).__init__(*args, **kwargs)
self.register_connect_callback(self.count_connect)

def count_connect(self, connection):
MyConnection.connect_calls += 1

pool = self.get_pool(connection_class=MyConnection)
r = redis.StrictRedis(connection_pool=pool)
r.ping()
pool.disconnect()
r.ping()

# If the connection is not disconnected by the pool the
# callback belonging to Connection will be called just
# one time.
assert MyConnection.connect_calls == 2


class TestBlockingConnectionPool(object):
def get_pool(self, connection_kwargs=None, max_connections=10, timeout=20):
Expand Down

0 comments on commit 92dfcc5

Please sign in to comment.