Skip to content

Commit

Permalink
POC fix for redis#732
Browse files Browse the repository at this point in the history
  • Loading branch information
pfreixes committed Jun 21, 2016
1 parent eae07e7 commit 6a95994
Showing 1 changed file with 35 additions and 5 deletions.
40 changes: 35 additions & 5 deletions redis/sentinel.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
import random
import weakref

from threading import Lock
from itertools import chain
from copy import copy

from redis.client import StrictRedis
from redis.connection import ConnectionPool, Connection
from redis.exceptions import (ConnectionError, ResponseError, ReadOnlyError,
Expand Down Expand Up @@ -65,6 +69,25 @@ def read_response(self):
raise ConnectionError('The previous master is now a slave')
raise

def shutdown_socket(self):
"""
Thread safe function to reset the soket belonging to the connection.
The connection will end up cleaning the socket and its dependencies
in a safe way
"""
try:
self._sock.shutdown(socket.SHUT_RDWR)
except AttributeError:
# either _sock attribute does not exist or
# connection removed it.
pass
except OSError as e:
if e.errno == 107:
# Transport endpoint is not connected
pass
else:
raise


class SentinelConnectionPool(ConnectionPool):
"""
Expand All @@ -83,6 +106,7 @@ def __init__(self, service_name, sentinel_manager, **kwargs):
self.connection_kwargs['connection_pool'] = weakref.proxy(self)
self.service_name = service_name
self.sentinel_manager = sentinel_manager
self._lock_get_master_address = Lock()

def __repr__(self):
return "%s<service=%s(%s)" % (
Expand All @@ -99,12 +123,17 @@ def reset(self):
def get_master_address(self):
master_address = self.sentinel_manager.discover_master(
self.service_name)

if self.is_master:
if self.master_address is None:
self.master_address = master_address
elif master_address != self.master_address:
# Master address changed, disconnect all clients in this pool
self.disconnect()
with self._lock_get_master_address:
if master_address != self.master_address:
# Master address changed, disconnect all clients in this pool
all_conns = chain(copy(self._available_connections),
copy(self._in_use_connections))
for connection in all_conns:
connection.shutdown_socket()
else:
self.master_address = master_address
return master_address

def rotate_slaves(self):
Expand All @@ -125,6 +154,7 @@ def rotate_slaves(self):
pass
raise SlaveNotFoundError('No slave found for %r' % (self.service_name))


def _checkpid(self):
if self.pid != os.getpid():
self.disconnect()
Expand Down

0 comments on commit 6a95994

Please sign in to comment.