Skip to content

Commit

Permalink
feat: add global key prefix to redis transport
Browse files Browse the repository at this point in the history
  • Loading branch information
wetneb authored and gabor-boros committed Jun 30, 2021
1 parent e5dbfed commit d3f6435
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 0 deletions.
31 changes: 31 additions & 0 deletions kombu/transport/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
* ``unacked_restore_limit``
* ``fanout_prefix``
* ``fanout_patterns``
* ``global_keyprefix``: (str) The global key prefix to be prepended to all keys
used by Kombu
* ``socket_timeout``
* ``socket_connect_timeout``
* ``socket_keepalive``
Expand Down Expand Up @@ -485,6 +487,11 @@ class Channel(virtual.Channel):
#: Disable for backwards compatibility with Kombu 3.x.
fanout_patterns = True

#: The global key prefix will be prepended to all keys used
#: by Kombu, which can be useful when a redis database is shared
#: by different users. By default, no prefix is prepended.
global_keyprefix = ''

#: Order in which we consume from queues.
#:
#: Can be either string alias, or a cycle strategy class
Expand Down Expand Up @@ -526,6 +533,7 @@ class Channel(virtual.Channel):
'unacked_restore_limit',
'fanout_prefix',
'fanout_patterns',
'global_keyprefix',
'socket_timeout',
'socket_connect_timeout',
'socket_keepalive',
Expand Down Expand Up @@ -562,6 +570,23 @@ def __init__(self, *args, **kwargs):
# by default.
self.keyprefix_fanout = ''

# Prepend the global key prefix
self.unacked_key = self._queue_with_prefix(self.unacked_key)
self.unacked_index_key = self._queue_with_prefix(
self.unacked_index_key
)
self.unacked_mutex_key = self._queue_with_prefix(
self.unacked_mutex_key
)

# The default `keyprefix_queue` starts with an underscore, therefore
# adding a prefix ending an undescore will result in double
# underscores. Since both `keyprefix_queue` and `global_keyprefix`
# can be set by the user, this behavior is better than manipulating
# `keyprefix_queue` here.
self.keyprefix_queue = self._queue_with_prefix(self.keyprefix_queue)
self.keyprefix_fanout = self._queue_with_prefix(self.keyprefix_fanout)

# Evaluate connection.
try:
self.client.ping()
Expand All @@ -577,6 +602,10 @@ def __init__(self, *args, **kwargs):
if register_after_fork is not None:
register_after_fork(self, _after_fork_cleanup_channel)

def _queue_with_prefix(self, queue):
"""Return the queue name prefixed with `global_keyprefix` if set."""
return self.global_keyprefix + queue

def _after_fork(self):
self._disconnect_pools()

Expand Down Expand Up @@ -632,6 +661,7 @@ def _restore_at_beginning(self, message):
return self._restore(message, leftmost=True)

def basic_consume(self, queue, *args, **kwargs):
queue = self._queue_with_prefix(queue)
if queue in self._fanout_queues:
exchange, _ = self._fanout_queues[queue]
self.active_fanout_queues.add(queue)
Expand Down Expand Up @@ -846,6 +876,7 @@ def _new_queue(self, queue, auto_delete=False, **kwargs):
self.auto_delete_queues.add(queue)

def _queue_bind(self, exchange, routing_key, pattern, queue):
queue = self._queue_with_prefix(queue)
if self.typeof(exchange).type == 'fanout':
# Mark exchange as fanout.
self._fanout_queues[queue] = (
Expand Down
23 changes: 23 additions & 0 deletions t/unit/transport/test_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -913,6 +913,29 @@ def test_sep_transport_option(self):
('celery', '', 'celery'),
]

def test_global_keyprefix(self):
with Connection(transport=Transport, transport_options={
'global_keyprefix': 'foo',
}) as conn:
channel = conn.channel()
c = channel._create_client = Mock()

body = {'hello': 'world'}
channel._put_fanout('exchange', body, '')
c().publish.assert_called_with('foo/{db}.exchange', dumps(body))

def test_global_keyprefix_queue_bind(self):
with Connection(transport=Transport, transport_options={
'global_keyprefix': 'foo',
}) as conn:
channel = conn.channel()
c = channel._create_client = Mock()
channel._queue_bind('default', '', None, 'queue')
c().sadd.assert_called_with(
'foo_kombu.binding.default',
'\x06\x16\x06\x16fooqueue'
)


class test_Redis:

Expand Down

0 comments on commit d3f6435

Please sign in to comment.