Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce global key prefix for redis transport #912

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion kombu/transport/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,11 @@ class Channel(virtual.Channel):
#: Disable for backwards compatibility with Kombu 3.x.
fanout_patterns = True

#: The global key prefix will be prepended to all keys used
#: by Kombu, which can be useful when a redis database is shared
#: by different users. By default, no prefix is prepended.
global_key_prefix = ''

#: Order in which we consume from queues.
#:
#: Can be either string alias, or a cycle strategy class
Expand Down Expand Up @@ -466,6 +471,7 @@ class Channel(virtual.Channel):
'unacked_restore_limit',
'fanout_prefix',
'fanout_patterns',
'global_key_prefix',
'socket_timeout',
'socket_connect_timeout',
'socket_keepalive',
Expand Down Expand Up @@ -500,6 +506,10 @@ def __init__(self, *args, **kwargs):
# by default.
self.keyprefix_fanout = ''

# Prepend the global key prefix to the two key prefixes in use
self.keyprefix_fanout = self.global_key_prefix + self.keyprefix_fanout
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just an observation: if you notice the comment:

# previous versions did not set a fanout, so cannot enable
# by default.
self.keyprefix_fanout = ''

In the current implementation, the prefix will not be an empty string, thus now a non-empty pattern will be available for subscription:

def _get_publish_topic(self, exchange, routing_key):
if routing_key and self.fanout_patterns:
return ''.join([self.keyprefix_fanout, exchange, '/', routing_key])

Thus if someone uses a global key prefix they will also implicitly use a non-empty fanout prefix. I think if you move the logic for self.keyprefix_fanout global prefixing under line 503 it would be safer, either way. How does that sound?

Copy link
Contributor Author

@wetneb wetneb Oct 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, I am not sure I follow the logic. Yes a global key prefix will induce a non-empty fanout prefix. If we move the prefixing under line 503 then by default the global key prefix will not be applied to fanouts by default, so it is not really global, is it?

Compatibility with previous versions is not a concern here: using a global key prefix will make the channel incompatible with any previous versions anyway.

self.keyprefix_queue = self.global_key_prefix + self.keyprefix_queue

# Evaluate connection.
try:
self.client.ping()
Expand Down Expand Up @@ -757,7 +767,8 @@ def _size(self, queue):

def _q_for_pri(self, queue, pri):
pri = self.priority(pri)
return '%s%s%s' % ((queue, self.sep, pri) if pri else (queue, '', ''))
key = '%s%s%s' % ((queue, self.sep, pri) if pri else (queue, '', ''))
return self.global_key_prefix + key
Copy link
Contributor

@georgepsarakis georgepsarakis Oct 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is redundant, don't the queue names (queue variable here) already contain self.keyprefix_queue, which in its turn is prefixed with the global namespace?

Copy link
Contributor Author

@wetneb wetneb Oct 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@georgepsarakis I was puzzled by this too!

I assume that because this method comes from the base interface, it is just called with the bare queue name without prefix (because prefixes are introduced by the redis implementation).

def _put(self, queue, message):

But I might be wrong! If this method is indeed called with the prefixed queue name, then I guess the second test case becomes trivial…

My analysis is that the original code lacked a concatenation with self.keyprefix_queue here originally. I can change my PR to fix that (instead of just prepending the self.global_key_prefix). At the moment, the self.keyprefix_queue seems to be used only for sets, not lists - this is probably unintentional.


def priority(self, n):
steps = self.priority_steps
Expand Down
21 changes: 21 additions & 0 deletions t/unit/transport/test_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -828,6 +828,27 @@ def test_rediss_connection(self):
redis.redis.SSLConnection,
)

def test_global_key_prefix_fanout(self):
conn = Connection(transport=Transport, transport_options={
'global_key_prefix': 'foo',
})
chan = conn.channel()
c = chan._create_client = Mock()

body = {'hello': 'world'}
chan._put_fanout('exchange', body, '')
georgepsarakis marked this conversation as resolved.
Show resolved Hide resolved
c().publish.assert_called_with('foo/{db}.exchange', dumps(body))

def test_global_key_prefix_put(self):
conn = Connection(transport=Transport, transport_options={
'global_key_prefix': 'foo',
})
chan = conn.channel()
c = chan._create_client = Mock()
message = {'hello': 'world'}
chan._put('queue', message)
c().lpush.assert_called_with('fooqueue', dumps(message))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@georgepsarakis the test case for Channel._put is here.



@skip.unless_module('redis')
class test_Redis:
Expand Down