Skip to content

Commit

Permalink
Add global key prefix for keys set by Redis transporter (celery#1349)
Browse files Browse the repository at this point in the history
* Introduce global key prefix for redis transport

Co-authored-by: Matus Valo <[email protected]>

* refactor: use a custom redis client

As per the suggestions, refactor the redis key prefixing to use a custom
redis client that prefixes the keys it uses.

The custom client implementation does not prefix every key by default as
the way of prefixing keys may differ for some redis commands, instead it
lists those keys that will be prefixed. In case of commands, where
multiple keys can be passed as an argument, the custom client defines
where the arg positions are starting and ending for the given command.

* test: fix unit tests by moving import statement

* fix: wrap redis.parse_response to remove key prefixes

Co-authored-by: Matus Valo <[email protected]>

* fix: typo

* fix: lint

Co-authored-by: Antonin Delpeuch <[email protected]>
Co-authored-by: Matus Valo <[email protected]>
Co-authored-by: Jillian Vogel <[email protected]>
  • Loading branch information
4 people committed Aug 26, 2021
1 parent 0f63705 commit 61f3f8d
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 15 deletions.
50 changes: 35 additions & 15 deletions kombu/transport/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
"""
from __future__ import absolute_import, unicode_literals

import functools
import numbers
import socket

Expand Down Expand Up @@ -183,10 +184,15 @@ def _after_fork_cleanup_channel(channel):
channel._after_fork()


class PrefixedStrictRedis(redis.StrictRedis):
"""Returns a ``StrictRedis`` client that prefixes the keys it uses."""
class GlobalKeyPrefixMixin:
"""Mixin to provide common logic for global key prefixing.
Overriding all the methods used by Kombu with the same key prefixing logic
would be cumbersome and inefficient. Hence, we override the command
execution logic that is called by all commands.
"""

PREFIXABLE_SINGLE_KEY_COMMANDS = [
PREFIXED_SIMPLE_COMMANDS = [
"HDEL",
"HGET",
"HSET",
Expand All @@ -201,25 +207,21 @@ class PrefixedStrictRedis(redis.StrictRedis):
"ZREVRANGEBYSCORE",
]

PREFIXABLE_MULTI_KEYS_COMMANDS = {
PREFIXED_COMPLEX_COMMANDS = {
"BRPOP": {"args_start": 0, "args_end": -1},
"EVALSHA": {"args_start": 2, "args_end": 3},
}

def __init__(self, *args, **kwargs):
self.global_keyprefix = kwargs.pop('global_keyprefix', '')
super().__init__(self, *args, **kwargs)

def _prefix_args(self, args):
args = list(args)
command = args.pop(0)

if command in self.PREFIXABLE_SINGLE_KEY_COMMANDS:
if command in self.PREFIXED_SIMPLE_COMMANDS:
args[0] = self.global_keyprefix + str(args[0])

if command in self.PREFIXABLE_MULTI_KEYS_COMMANDS.keys():
args_start = self.PREFIXABLE_MULTI_KEYS_COMMANDS[command]["args_start"]
args_end = self.PREFIXABLE_MULTI_KEYS_COMMANDS[command]["args_end"]
if command in self.PREFIXED_COMPLEX_COMMANDS.keys():
args_start = self.PREFIXED_COMPLEX_COMMANDS[command]["args_start"]
args_end = self.PREFIXED_COMPLEX_COMMANDS[command]["args_end"]

pre_args = args[:args_start] if args_start > 0 else []

Expand All @@ -238,7 +240,7 @@ def _prefix_args(self, args):
return [command, *args]

def parse_response(self, connection, command_name, **options):
"""Parses a response from the Redis server.
"""Parse a response from the Redis server.
Method wraps ``redis.parse_response()`` to remove prefixes of keys
returned by redis command.
Expand All @@ -263,7 +265,22 @@ def pipeline(self, transaction=True, shard_hint=None):
)


class PrefixedRedisPipeline(PrefixedStrictRedis, redis.client.Pipeline):
class PrefixedStrictRedis(GlobalKeyPrefixMixin, redis.Redis):
"""Returns a ``StrictRedis`` client that prefixes the keys it uses."""

def __init__(self, *args, **kwargs):
self.global_keyprefix = kwargs.pop('global_keyprefix', '')
redis.Redis.__init__(self, *args, **kwargs)


class PrefixedRedisPipeline(GlobalKeyPrefixMixin, redis.client.Pipeline):
"""Custom Redis pipeline that takes global_keyprefix into consideration.
As the ``PrefixedStrictRedis`` client uses the `global_keyprefix` to prefix
the keys it uses, the pipeline called by the client must be able to prefix
the keys as well.
"""

def __init__(self, *args, **kwargs):
self.global_keyprefix = kwargs.pop('global_keyprefix', '')
redis.client.Pipeline.__init__(self, *args, **kwargs)
Expand Down Expand Up @@ -1135,7 +1152,10 @@ def _get_client(self):
'You have {0.__version__}'.format(redis))

if self.global_keyprefix:
return PrefixedStrictRedis
return functools.partial(
PrefixedStrictRedis,
global_keyprefix=self.global_keyprefix,
)

return redis.StrictRedis

Expand Down
89 changes: 89 additions & 0 deletions t/unit/transport/test_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -968,6 +968,14 @@ def test_global_keyprefix_queue_bind(self, mock_execute_command):
)


channel._queue_bind('default', '', None, 'queue')
mock_execute_command.assert_called_with(
'SADD',
'foo__kombu.binding.default',
'\x06\x16\x06\x16queue'
)


@skip.unless_module('redis')
class test_Redis:

Expand Down Expand Up @@ -1530,3 +1538,84 @@ def test_can_create_connection(self):
)
with pytest.raises(ConnectionError):
connection.channel()

def test_missing_master_name_transport_option(self):
connection = Connection(
'sentinel://localhost:65534/',
)
with patch('redis.sentinel.Sentinel'), \
pytest.raises(ValueError) as excinfo:
connection.connect()
expected = "'master_name' transport option must be specified."
assert expected == excinfo.value.args[0]

def test_sentinel_with_ssl(self):
ssl_params = {
'ssl_cert_reqs': 2,
'ssl_ca_certs': '/foo/ca.pem',
'ssl_certfile': '/foo/cert.crt',
'ssl_keyfile': '/foo/pkey.key'
}
with patch('redis.sentinel.Sentinel'):
with Connection(
'sentinel://',
transport_options={'master_name': 'not_important'},
ssl=ssl_params) as conn:
params = conn.default_channel._connparams()
assert params['ssl_cert_reqs'] == ssl_params['ssl_cert_reqs']
assert params['ssl_ca_certs'] == ssl_params['ssl_ca_certs']
assert params['ssl_certfile'] == ssl_params['ssl_certfile']
assert params['ssl_keyfile'] == ssl_params['ssl_keyfile']
assert params.get('ssl') is None
from kombu.transport.redis import SentinelManagedSSLConnection
assert (params['connection_class'] is
SentinelManagedSSLConnection)


class test_GlobalKeyPrefixMixin:

from kombu.transport.redis import GlobalKeyPrefixMixin

global_keyprefix = "prefix_"
mixin = GlobalKeyPrefixMixin()
mixin.global_keyprefix = global_keyprefix

def test_prefix_simple_args(self):
for command in self.mixin.PREFIXED_SIMPLE_COMMANDS:
prefixed_args = self.mixin._prefix_args([command, "fake_key"])
assert prefixed_args == [
command,
f"{self.global_keyprefix}fake_key"
]

def test_prefix_brpop_args(self):
prefixed_args = self.mixin._prefix_args([
"BRPOP",
"fake_key",
"fake_key2",
"not_prefixed"
])

assert prefixed_args == [
"BRPOP",
f"{self.global_keyprefix}fake_key",
f"{self.global_keyprefix}fake_key2",
"not_prefixed",
]

def test_prefix_evalsha_args(self):
prefixed_args = self.mixin._prefix_args([
"EVALSHA",
"not_prefixed",
"not_prefixed",
"fake_key",
"not_prefixed",
])

assert prefixed_args == [
"EVALSHA",
"not_prefixed",
"not_prefixed",
f"{self.global_keyprefix}fake_key",
"not_prefixed",
]

0 comments on commit 61f3f8d

Please sign in to comment.