Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use thread-local for connecting to redis to avoid race conditions on master failover #2

Merged
merged 3 commits into from
Sep 21, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ python:
- "3.3"
- "3.4"
- "3.5"
- "3.6"
- "pypy"
install:
- "pip install ."
6 changes: 6 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
@@ -87,6 +87,12 @@ Accessing redis-py's Sentinel instance
Change log
----------

v2.0.0
~~~~~~

* Connections are now thread-local to avoid race conditions after Redis master failover
* Removed support for `REDIS_{HOST, PORT, DB}` config variables

v1.0.0
~~~~~~

265 changes: 115 additions & 150 deletions flask_redis_sentinel.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2015, 2016 Exponea s r.o. <info@exponea.com>
# Copyright 2015, 2016, 2017 Exponea s r.o. <info@exponea.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -12,110 +12,89 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from collections import namedtuple
import six
import inspect
try:
import urllib.parse as urlparse
except ImportError:
import urlparse
from flask import current_app
import warnings
import redis
import redis.sentinel # requires redis-py 2.9.0+
import redis.sentinel
import redis_sentinel_url
import sys
from werkzeug.local import LocalProxy
from flask import current_app
from werkzeug.local import Local, LocalProxy
from werkzeug.utils import import_string


if sys.version_info[0] == 2: # pragma: no cover
# Python 2.x
_string_types = basestring

def iteritems(d):
return d.iteritems()
else: # pragma: no cover
# Python 3.x
_string_types = str

def iteritems(d):
return d.items()

_EXTENSION_KEY = 'redissentinel'


class _ExtensionData(object):
def __init__(self, client_class, sentinel=None, default_connection=None):
class RedisSentinelInstance(object):

def __init__(self, url, client_class, client_options, sentinel_class, sentinel_options):
self.url = url
self.client_class = client_class
self.sentinel = sentinel
self.default_connection = default_connection
self.master_connections = {}
self.slave_connections = {}
self.client_options = client_options
self.sentinel_class = sentinel_class
self.sentinel_options = sentinel_options
self.local = Local()
self._connect()
if self.local.connection[0] is None:
# if there is no sentinel, we don't need to use thread-local storage
self.connection = self.local.connection
self.local = self

def _connect(self):
try:
return self.local.connection
except AttributeError:
conn = redis_sentinel_url.connect(
self.url,
sentinel_class=self.sentinel_class, sentinel_options=self.sentinel_options,
client_class=self.client_class, client_options=self.client_options)
self.local.connection = conn
return conn

@property
def sentinel(self):
return self._connect()[0]

@property
def default_connection(self):
return self._connect()[1]

def master_for(self, service_name, **kwargs):
if self.sentinel is None:
raise ValueError('Cannot get master {} using non-sentinel configuration'.format(service_name))
if service_name not in self.master_connections:
self.master_connections[service_name] = self.sentinel.master_for(service_name, redis_class=self.client_class,
**kwargs)
return self.master_connections[service_name]

def slave_for(self, service_name, **kwargs):
if self.sentinel is None:
raise ValueError('Cannot get slave {} using non-sentinel configuration'.format(service_name))
if service_name not in self.slave_connections:
self.slave_connections[service_name] = self.sentinel.slave_for(service_name, redis_class=self.client_class,
**kwargs)
return self.slave_connections[service_name]


class _ExtensionProxy(LocalProxy):
__slots__ = ('__sentinel',)

def __init__(self, sentinel, local, name=None):
object.__setattr__(self, '_ExtensionProxy__sentinel', sentinel)
super(_ExtensionProxy, self).__init__(local, name=name)

def _get_current_object(self):
app = current_app._get_current_object()
if _EXTENSION_KEY not in app.extensions or self.__sentinel.config_prefix not in app.extensions[_EXTENSION_KEY]:
raise ValueError('RedisSentinel extension with config prefix {} was not initialized for application {}'.
format(self.__sentinel.config_prefix, app.import_name))
ext_data = app.extensions[_EXTENSION_KEY][self.__sentinel.config_prefix]

local = object.__getattribute__(self, '_LocalProxy__local')

return local(ext_data)


class _PrefixedDict(object):
def __init__(self, config, prefix):
self.config = config
self.prefix = prefix

def _key(self, key):
return '{}_{}'.format(self.prefix, key)

def __getitem__(self, item):
return self.config[self._key(item)]
try:
return self.local.master_connections[service_name]
except AttributeError:
self.local.master_connections = {}
except KeyError:
pass

def __setitem__(self, item, value):
self.config[self._key(item)] = value
sentinel = self.sentinel
if sentinel is None:
msg = 'Cannot get master {} using non-sentinel configuration'
raise RuntimeError(msg.format(service_name))

def __delitem__(self, item):
del self.config[self._key(item)]
conn = sentinel.master_for(service_name, redis_class=self.client_class, **kwargs)
self.local.master_connections[service_name] = conn
return conn

def __contains__(self, item):
return self._key(item) in self.config
def slave_for(self, service_name, **kwargs):
try:
return self.local.slave_connections[service_name]
except AttributeError:
self.local.slave_connections = {}
except KeyError:
pass

def get(self, item, default=None):
return self.config.get(self._key(item), default)
sentinel = self.sentinel
if sentinel is None:
msg = 'Cannot get slave {} using non-sentinel configuration'
raise RuntimeError(msg.format(service_name))

def pop(self, item, *args, **kwargs):
return self.config.pop(self._key(item), *args, **kwargs)
conn = sentinel.slave_for(service_name, redis_class=self.client_class, **kwargs)
self.local.slave_connections[service_name] = conn
return conn


class SentinelExtension(object):
class RedisSentinel(object):
"""Flask extension that supports connections to master using Redis Sentinel.

Supported URL types:
@@ -124,92 +103,78 @@ class SentinelExtension(object):
rediss://
unix://
"""
def __init__(self, app=None, config_prefix=None, client_class=None, sentinel_class=None):
self.config_prefix = None

def __init__(self, app=None, config_prefix='REDIS', client_class=None, sentinel_class=None):
self.config_prefix = config_prefix
self.client_class = client_class
self.sentinel_class = sentinel_class
if app is not None:
self.init_app(app, config_prefix=config_prefix)
self.default_connection = _ExtensionProxy(self, lambda ext_data: ext_data.default_connection)
self.sentinel = _ExtensionProxy(self, lambda ext_data: ext_data.sentinel)
self.init_app(app)
self.sentinel = LocalProxy(lambda: self.get_instance().sentinel)
self.default_connection = LocalProxy(lambda: self.get_instance().default_connection)

def init_app(self, app, config_prefix=None, client_class=None, sentinel_class=None):
if _EXTENSION_KEY not in app.extensions:
app.extensions[_EXTENSION_KEY] = {}

extensions = app.extensions[_EXTENSION_KEY]
config_prefix = config_prefix or self.config_prefix
app.config.setdefault(config_prefix + '_' + 'URL', 'redis://localhost/0')

if config_prefix is None:
config_prefix = 'REDIS'
config = self._strip_dict_prefix(app.config, config_prefix + '_')

extensions = app.extensions.setdefault(_EXTENSION_KEY, {})
if config_prefix in extensions:
raise ValueError('Config prefix {} already registered'.format(config_prefix))

self.config_prefix = config_prefix
msg = 'Redis sentinel extension with config prefix {} is already registered'
raise RuntimeError(msg.format(config_prefix))

config = _PrefixedDict(app.config, config_prefix)
url = config.get('URL')
client_class = self._resolve_class(
config, 'CLASS', 'client_class', client_class, redis.StrictRedis)
sentinel_class = self._resolve_class(
config, 'SENTINEL_CLASS', 'sentinel_class', sentinel_class, redis.sentinel.Sentinel)

client_class = self._resolve_class(config, 'CLASS', client_class, 'client_class',
default=redis.StrictRedis)
sentinel_class = self._resolve_class(config, 'SENTINEL_CLASS', sentinel_class, 'sentinel_class',
default=redis.sentinel.Sentinel)
url = config.pop('URL')
client_options = self._config_from_variables(config, client_class)
sentinel_options = self._config_from_variables(
self._strip_dict_prefix(config, 'SENTINEL_'), client_class)

data = _ExtensionData(client_class)
extensions[config_prefix] = RedisSentinelInstance(
url, client_class, client_options, sentinel_class, sentinel_options)

if url:
connection_options = self._config_from_variables(config, client_class)
sentinel_options = self._config_from_variables(_PrefixedDict(config, 'SENTINEL'), client_class)

connection_options.pop('host', None)
connection_options.pop('port', None)
connection_options.pop('db', None)

result = redis_sentinel_url.connect(url, sentinel_class=sentinel_class,
sentinel_options=sentinel_options,
client_class=client_class,
client_options=connection_options)
data.sentinel, data.default_connection = result
else:
# Stay compatible with Flask-And-Redis for a while
warnings.warn('Setting redis connection via separate variables is deprecated. Please use REDIS_URL.',
DeprecationWarning)
kwargs = self._config_from_variables(config, client_class)
data.default_connection = client_class(**kwargs)

extensions[config_prefix] = data
self.config_prefix = config_prefix

def _resolve_class(self, config, config_key, the_class, attr, default):
if the_class is not None:
pass
elif getattr(self, attr) is not None:
def _resolve_class(self, config, config_key, attr, the_class, default_class):
if the_class is None:
the_class = getattr(self, attr)
else:
the_class = config.get(config_key, default)
if isinstance(the_class, _string_types):
the_class = import_string(the_class)
if the_class is None:
the_class = config.get(config_key, default_class)
if isinstance(the_class, six.string_types):
the_class = import_string(the_class)
config.pop(config_key, None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to remove the key from config?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted the config to be clean when it's passed to the function that matches the keys to the constructor arguments. In this case I know I don't want the class name to be passed as an argument. I can remove it if you prefer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, let's keep it as it is then.

return the_class

@staticmethod
def _config_from_variables(config, client_class):
host = config.get('HOST')
if host and (host.startswith('file://') or host.startswith('/')):
del config['HOST']
config['UNIX_SOCKET_PATH'] = host
def _strip_dict_prefix(orig, prefix):
return {k[len(prefix):]: v for (k, v) in six.iteritems(orig) if k.startswith(prefix)}

args = inspect.getargspec(client_class.__init__).args
@staticmethod
def _config_from_variables(config, the_class):
args = inspect.getargspec(the_class.__init__).args
args.remove('self')
args.remove('host')
args.remove('port')
args.remove('db')

def get_config(suffix):
value = config[suffix]
if suffix == 'PORT':
return int(value)
return value
return {arg: config[arg.upper()] for arg in args if arg.upper() in config}

return {arg: get_config(arg.upper()) for arg in args if arg.upper() in config}
def get_instance(self):
app = current_app._get_current_object()
if _EXTENSION_KEY not in app.extensions or self.config_prefix not in app.extensions[_EXTENSION_KEY]:
msg = 'Redis sentinel extension with config prefix {} was not initialized for application {}'
raise RuntimeError(msg.format(self.config_prefix, app.import_name))
return app.extensions[_EXTENSION_KEY][self.config_prefix]

def master_for(self, service_name, **kwargs):
return _ExtensionProxy(self, lambda ext_data: ext_data.master_for(service_name, **kwargs))
return LocalProxy(lambda: self.get_instance().master_for(service_name, **kwargs))

def slave_for(self, service_name, **kwargs):
return _ExtensionProxy(self, lambda ext_data: ext_data.slave_for(service_name, **kwargs))
return LocalProxy(lambda: self.get_instance().slave_for(service_name, **kwargs))


SentinelExtension = RedisSentinel # for backwards-compatibility
8 changes: 5 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
@@ -5,12 +5,12 @@
setup(
name='Flask-Redis-Sentinel',
py_modules=['flask_redis_sentinel'],
version='1.0.0',
install_requires=['Flask>=0.10.1', 'redis>=2.10.3', 'redis_sentinel_url>=1.0.0,<2.0.0'],
version='2.0.0',
install_requires=['Flask>=0.10.1', 'redis>=2.10.3', 'redis_sentinel_url>=1.0.0,<2.0.0', 'six'],
description='Redis-Sentinel integration for Flask',
url='https://github.com/exponea/flask-redis-sentinel',
author='Martin Sucha',
author_email='martin.sucha@infinario.com',
author_email='martin.sucha@exponea.com',
license='Apache 2.0',
classifiers=[
'Programming Language :: Python',
@@ -19,6 +19,8 @@
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Operating System :: OS Independent',
'License :: OSI Approved :: Apache Software License',
'Development Status :: 5 - Production/Stable',
Loading