Skip to content

Commit

Permalink
Introduce global key prefix for redis transport
Browse files Browse the repository at this point in the history
Co-authored-by: Matus Valo <[email protected]>

refactor: use a custom redis client

As per the suggestions, refactor the redis key prefixing to use a custom
redis client that prefixes the keys it uses.

The custom client implementation does not prefix every key by default as
the way of prefixing keys may differ for some redis commands, instead it
lists those keys that will be prefixed. In case of commands, where
multiple keys can be passed as an argument, the custom client defines
where the arg positions are starting and ending for the given command.

docs: update authors doc
(cherry picked from commit 3fe1f880846fa44b346496390e4198859662476c)

squashed commits from celery#1349
  • Loading branch information
wetneb authored and pomegranited committed Aug 20, 2021
1 parent 8a0b334 commit ba8b2f9
Show file tree
Hide file tree
Showing 3 changed files with 194 additions and 6 deletions.
1 change: 1 addition & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ Fernando Jorge Mota <[email protected]>
Flavio [FlaPer87] Percoco Premoli <[email protected]>
Florian Munz <[email protected]>
Franck Cuny <[email protected]>
Gábor Boros <[email protected]>
Germán M. Bravo <[email protected]>
Gregory Haskins <[email protected]>
Hank John <[email protected]>
Expand Down
154 changes: 149 additions & 5 deletions kombu/transport/redis.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,55 @@
"""Redis transport."""
"""Redis transport module for Kombu.
Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: Yes
* Supports Priority: Yes
* Supports TTL: No
Connection String
=================
Connection string has the following format:
.. code-block::
redis://REDIS_ADDRESS[:PORT][/VIRTUALHOST]
rediss://REDIS_ADDRESS[:PORT][/VIRTUALHOST]
To use sentinel for dynamic Redis discovery,
the connection string has following format:
.. code-block::
sentinel://SENTINEL_ADDRESS[:PORT]
Transport Options
=================
* ``sep``
* ``ack_emulation``: (bool) If set to True transport will
simulate Acknowledge of AMQP protocol.
* ``unacked_key``
* ``unacked_index_key``
* ``unacked_mutex_key``
* ``unacked_mutex_expire``
* ``visibility_timeout``
* ``unacked_restore_limit``
* ``fanout_prefix``
* ``fanout_patterns``
* ``global_keyprefix``: (str) The global key prefix to be prepended to all keys
used by Kombu
* ``socket_timeout``
* ``socket_connect_timeout``
* ``socket_keepalive``
* ``socket_keepalive_options``
* ``queue_order_strategy``
* ``max_connections``
* ``health_check_interval``
* ``retry_on_timeout``
* ``priority_steps``
"""
from __future__ import absolute_import, unicode_literals

import numbers
Expand Down Expand Up @@ -132,6 +183,79 @@ def _after_fork_cleanup_channel(channel):
channel._after_fork()


class PrefixedStrictRedis(redis.StrictRedis):
"""Returns a ``StrictRedis`` client that prefixes the keys it uses."""

PREFIXABLE_SINGLE_KEY_COMMANDS = [
"HDEL",
"HGET",
"HSET",
"LLEN",
"LPUSH",
"PUBLISH",
"SADD",
"SET",
"SMEMBERS",
"ZADD",
"ZREM",
"ZREVRANGEBYSCORE",
]

PREFIXABLE_MULTI_KEYS_COMMANDS = {
"BRPOP": {"args_start": 0, "args_end": -1},
"EVALSHA": {"args_start": 2, "args_end": 3},
}

def __init__(self, *args, **kwargs):
self.global_keyprefix = kwargs.pop('global_keyprefix', '')
super().__init__(self, *args, **kwargs)

def _prefix_args(self, args):
args = list(args)
command = args.pop(0)

if command in self.PREFIXABLE_SINGLE_KEY_COMMANDS:
args[0] = self.global_keyprefix + str(args[0])

if command in self.PREFIXABLE_MULTI_KEYS_COMMANDS.keys():
args_start = self.PREFIXABLE_MULTI_KEYS_COMMANDS[command]["args_start"]
args_end = self.PREFIXABLE_MULTI_KEYS_COMMANDS[command]["args_end"]

pre_args = args[:args_start] if args_start > 0 else []

if args_end is not None:
post_args = args[args_end:]
elif args_end < 0:
post_args = args[len(args):]
else:
post_args = []

args = pre_args + [
self.global_keyprefix + str(arg)
for arg in args[args_start:args_end]
] + post_args

return [command, *args]

def execute_command(self, *args, **kwargs):
return super().execute_command(*self._prefix_args(args), **kwargs)

def pipeline(self, transaction=True, shard_hint=None):
return PrefixedRedisPipeline(
self.connection_pool,
self.response_callbacks,
transaction,
shard_hint,
global_keyprefix=self.global_keyprefix,
)


class PrefixedRedisPipeline(PrefixedStrictRedis, redis.client.Pipeline):
def __init__(self, *args, **kwargs):
self.global_keyprefix = kwargs.pop('global_keyprefix', '')
redis.client.Pipeline.__init__(self, *args, **kwargs)


class QoS(virtual.QoS):
"""Redis Ack Emulation."""

Expand Down Expand Up @@ -441,6 +565,11 @@ class Channel(virtual.Channel):
#: Disable for backwards compatibility with Kombu 3.x.
fanout_patterns = True

#: The global key prefix will be prepended to all keys used
#: by Kombu, which can be useful when a redis database is shared
#: by different users. By default, no prefix is prepended.
global_keyprefix = ''

#: Order in which we consume from queues.
#:
#: Can be either string alias, or a cycle strategy class
Expand Down Expand Up @@ -482,6 +611,7 @@ class Channel(virtual.Channel):
'unacked_restore_limit',
'fanout_prefix',
'fanout_patterns',
'global_keyprefix',
'socket_timeout',
'socket_connect_timeout',
'socket_keepalive',
Expand Down Expand Up @@ -724,7 +854,12 @@ def _brpop_start(self, timeout=1):
keys = [self._q_for_pri(queue, pri) for pri in self.priority_steps
for queue in queues] + [timeout or 0]
self._in_poll = self.client.connection
self.client.connection.send_command('BRPOP', *keys)

command_args = ['BRPOP', *keys]
if self.global_keyprefix:
command_args = self.client._prefix_args(command_args)

self.client.connection.send_command(*command_args)

def _brpop_read(self, **options):
try:
Expand Down Expand Up @@ -966,9 +1101,14 @@ def disconnect(self):
return connparams

def _create_client(self, asynchronous=False):
if asynchronous:
return self.Client(connection_pool=self.async_pool)
return self.Client(connection_pool=self.pool)
kwargs = {
"connection_pool": self.async_pool if asynchronous else self.pool
}

if self.global_keyprefix:
kwargs.update({"global_keyprefix": self.global_keyprefix})

return self.Client(**kwargs)

def _get_pool(self, asynchronous=False):
params = self._connparams(asynchronous=asynchronous)
Expand All @@ -980,6 +1120,10 @@ def _get_client(self):
raise VersionMismatch(
'Redis transport requires redis-py versions 3.2.0 or later. '
'You have {0.__version__}'.format(redis))

if self.global_keyprefix:
return PrefixedStrictRedis

return redis.StrictRedis

@contextmanager
Expand Down
45 changes: 44 additions & 1 deletion t/unit/transport/test_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ def test_rotate_cycle_ValueError(self):
def test_get_client(self):
import redis as R
KombuRedis = redis.Channel._get_client(self.channel)
assert KombuRedis
assert isinstance(KombuRedis(), R.StrictRedis)

Rv = getattr(R, 'VERSION', None)
try:
Expand All @@ -755,6 +755,12 @@ def test_get_client(self):
if Rv is not None:
R.VERSION = Rv

def test_get_prefixed_client(self):
from kombu.transport.redis import PrefixedStrictRedis
self.channel.global_keyprefix = "test_"
PrefixedRedis = redis.Channel._get_client(self.channel)
assert isinstance(PrefixedRedis(), PrefixedStrictRedis)

def test_get_response_error(self):
from redis.exceptions import ResponseError
assert redis.Channel._get_response_error(self.channel) is ResponseError
Expand Down Expand Up @@ -924,6 +930,43 @@ def test_sep_transport_option(self):
('celery', '', 'celery'),
]

@patch("redis.StrictRedis.execute_command")
def test_global_keyprefix(self, mock_execute_command):
from kombu.transport.redis import PrefixedStrictRedis

with Connection(transport=Transport) as conn:
client = PrefixedStrictRedis(global_keyprefix='foo_')

channel = conn.channel()
channel._create_client = Mock()
channel._create_client.return_value = client

body = {'hello': 'world'}
channel._put_fanout('exchange', body, '')
mock_execute_command.assert_called_with(
'PUBLISH',
'foo_/{db}.exchange',
dumps(body)
)

@patch("redis.StrictRedis.execute_command")
def test_global_keyprefix_queue_bind(self, mock_execute_command):
from kombu.transport.redis import PrefixedStrictRedis

with Connection(transport=Transport) as conn:
client = PrefixedStrictRedis(global_keyprefix='foo_')

channel = conn.channel()
channel._create_client = Mock()
channel._create_client.return_value = client

channel._queue_bind('default', '', None, 'queue')
mock_execute_command.assert_called_with(
'SADD',
'foo__kombu.binding.default',
'\x06\x16\x06\x16queue'
)


@skip.unless_module('redis')
class test_Redis:
Expand Down

0 comments on commit ba8b2f9

Please sign in to comment.