Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide an alternative channel layer implementation that uses Redis Pub/Sub #247

Closed
wants to merge 23 commits into from

Conversation

acu192
Copy link
Collaborator

@acu192 acu192 commented Apr 28, 2021

Fixes Issue #245

@acu192
Copy link
Collaborator Author

acu192 commented Apr 28, 2021

There is one logic bug, but I'll propose that we do not fix it.

This implementation does not support a Consumer doing this:

self.channel_layer.group_add("whatever_group", someone_elses_channel_name)

Rather, you need to stick with the normal pattern of only adding yourself to groups:

self.channel_layer.group_add("whatever_group", self.channel_name)

Stated another way: One consumer cannot subscribe a different consumer (denoted by someone_elses_channel_name above) to a group.

Again, I would propose that we let this be a documented limitation of this implementation. Fixing it is possible by publishing special messages destined for someone_elses_channel_name to tell them to do their own subscribing... but that logic will be convoluted, error-prone, and I doubt this will actually be an issue for most users anyway, so not worth it.

@acu192
Copy link
Collaborator Author

acu192 commented Apr 28, 2021

The other thing to note: Not all configuration parameters are supported (yet).

expiry

I don't think this is needed. A consumer's own message queue will be cleaned up when that consumer disconnects, which essentially "expires" those messages for us (removes them from memory, that is).

group_expiry

Similarly, a group naturally "expires" when no one is subscribed to it.

capacity

This might make sense to implement. It wouldn't be hard, just add a check of the message queue length inside _do_receiving().

channel_capacity

Same...

symmetric_encryption_keys

This would be easy enough to add, just pass these keys along to the two places where aioredis.create_redis() is used.

@aryaniyaps
Copy link

aryaniyaps commented Apr 29, 2021

@acu192 but there is a workaround for this.
In order to achieve this,
each user must have a private channel group of their own.

When you need to add an channel to a group, you send it a special message internally.

await self.channel_layer.group_send('user_private_channel', 'user_channel_name', {
    'type': 'join_group',
    'group_name': 'group_name_here'
    }
)

Similarly, for leaving groups, we could have an event handler called leave_group.
When the consumer receives this special event, it must call subscribe internally.
These event types handlers must be reserved and hence we can actually add channels to groups from outside.

If you have any more questions or I haven't explained this clearly. please let me know.
thanks!

@acu192
Copy link
Collaborator Author

acu192 commented Apr 29, 2021

each user must have a private channel of their own

This is already the case (via self.channel_name which is created by the channel layer's new_channel() method). But maybe I'm not understanding what your proposing; are you proposing that each consumer has a second private channel?

If you have any more questions or I haven't explained this clearly. please let me know.

I'm interested in thinking about this more, although I'm not seeing this as a solution yet. For example, a common use-case in my app (and I assume other apps) is to add a channel to a group then immediately do a group_send() to that group. In such a case, that new channel should get that group message (because your await group_add(...) returned, so you would assume it happened). However, with the solution you propose there would be a delay between the await group_add(...) returning to when the channel is actually added to the group (because of those special messages propagating through and needing to be processed on the other end). Therefore, a group_add() followed immediately by a group_send() would work if the channel was in-process, but would not work if the channel was "outside" the process (to use your term).

One idea, trying to solve it, would be for the caller of group_add() to wait for an acknowledgement message from the other consumer, saying "got it. I added myself like you told me to". That way, the caller of await group_add(...) would know that when it returns, it happened. But... here is where the real complexity comes in. What if you never get an ack? Entirely possible because the other consumer might have exited already. So maybe we have a timeout? This is where it begins to feel extremely error-prone.

@aryan340 Out of curiosity, do you need this functionality in your app? I.e. adding channels to groups from the "outside"?

@acu192
Copy link
Collaborator Author

acu192 commented Apr 29, 2021

I just added a runtime check to raise on the known limitation of this implementation (i.e. not being able to do a group_add(...) on channels that are not in-process).

I propose we merge this PR with this known limitation. It is a low-risk merge (it does not change any existing code) and we can continue improving it over time. @aryan340 Perhaps after this PR is merged, you can send in a new PR with the workaround you proposed?

Maintainers, what do you think?

@aryaniyaps
Copy link

@aryan340 Out of curiosity, do you need this functionality in your app? I.e. adding channels to groups from the "outside"?

Yes, I would be needing that. My use case is, users are a part of boxes. Whenever they join a box, I listen to that via a listener and add the user to the associated group. So this needs to be done outside the consumer.

This is already the case (via self.channel_name which is created by the channel layer's new_channel() method). But maybe I'm not understanding what your proposing; are you proposing that each consumer has a second private channel?

Sorry I meant that each consumer must have it's own private group.

@aryaniyaps
Copy link

aryaniyaps commented Apr 29, 2021

One idea, trying to solve it, would be for the caller of group_add() to wait for an acknowledgement message from the other consumer, saying "got it. I added myself like you told me to". That way, the caller of await group_add(...) would know that when it returns, it happened. But... here is where the real complexity comes in. What if you never get an ack? Entirely possible because the other consumer might have exited already. So maybe we have a timeout? This is where it begins to feel extremely error-prone

You can configure redis pub sub clients to listen for subscribe or unsubscribe messages. Everytime a client subscribes a message is sent. This way we can know if the consumer is finished subscribing.

(edit) adding onto my previous statement,
So for use cases where you need to send a message immediately, you can check for these messages from the client.
Or you can check if the channel is in the clients subscribed list of channels already.

@aryaniyaps
Copy link

@acu192 I am okay if this PR doesnt provide external group add support out of the box.
Based on my idea, anyone can just write handlers for the above use case, and its not a big deal.
But if you insist, I can make a PR too.

@acu192
Copy link
Collaborator Author

acu192 commented Apr 30, 2021

But if you insist, I can make a PR too.

No, I'm not insisting; it would be totally up to you if you wanted to try to extend this implementation with your idea.

But first we need the maintainers' buy-in to this PR.

channels_redis/alt.py Outdated Show resolved Hide resolved
channels_redis/alt.py Outdated Show resolved Hide resolved
channels_redis/alt.py Outdated Show resolved Hide resolved
channels_redis/alt.py Outdated Show resolved Hide resolved
@carltongibson
Copy link
Member

Hey @acu192 -- thanks for this. Looks good.

Let me have a play.

I think we should probably go with it as an option for folks.

Nice.

@acu192
Copy link
Collaborator Author

acu192 commented Apr 30, 2021

@carltongibson Thanks for considering this PR! I'm excited to get it integrated and help maintain it over time.

If you want a simple project to play around with this, I copied this implementation into this repo:
https://github.com/acu192/channels-bug

You can easily change which channel layer is used on this line of code. Currently it is set to use this new implementation, as you'll see. Maybe that gives you an easy way to play around with it.

@acu192
Copy link
Collaborator Author

acu192 commented Apr 30, 2021

@carltongibson Also, after this is merged, I'll create issues in this repo to document the features that are still needed in this implementation to bring it in-line with the original implementation. I think it's best if we address those additional features in future PRs, to take this incrementally.

channels_redis/alt.py Outdated Show resolved Hide resolved
channels_redis/alt.py Outdated Show resolved Hide resolved
channels_redis/alt.py Outdated Show resolved Hide resolved
channels_redis/alt.py Outdated Show resolved Hide resolved
channels_redis/alt.py Outdated Show resolved Hide resolved
channels_redis/alt.py Outdated Show resolved Hide resolved
channels_redis/alt.py Outdated Show resolved Hide resolved
channels_redis/alt.py Outdated Show resolved Hide resolved
channels_redis/alt.py Outdated Show resolved Hide resolved
@yedpodtrzitko
Copy link
Contributor

nicely done. I have just a few nitpicks about the code, feel free to disregard them if you dont consider them important.

Would it be possible to have a basic test added too, please?

@acu192
Copy link
Collaborator Author

acu192 commented May 9, 2021

@yedpodtrzitko Your suggestions were fixed in the two commits above. Thank you!

acu192 added a commit to acu192/channels-bug that referenced this pull request May 9, 2021
Copy link
Member

@carltongibson carltongibson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @acu192 — thanks for this. It looks nice.

Sorry for the delay. It's been hectic.

Some initial comments:

  • I'm going to pull in Added Redis sentinel support. #250. Can you look out for that and rebase?
  • alt.py is probably better as pubsub.py no?
  • For tests, do you think we can parametrize the test_core tests? (i.e. we just want to check the channel layer API works, so running base tests on both channel layers seems desirable.)

Current plan is to go over the open PRs and then pull together a release. Thanks!

@acu192
Copy link
Collaborator Author

acu192 commented May 12, 2021

@carltongibson The first two tasks are done:

  1. This PR is rebased. It now includes the changes from the other PR (Added Redis sentinel support. #250), which TBH, is not what I expected (maybe I did it wrong). If that's a problem I can close this PR and open a new one that is "fresh" with just my changes.
  2. I agree. I renamed alt.py to pubsub.py.

For the tests, I'll look into that next...

@acu192
Copy link
Collaborator Author

acu192 commented May 12, 2021

Okay, looking at the tests there are a few internal (non-API) differences that make this hard to use the same exact test code, for example the use of .client_prefix and .close_pools().

Another issue is that this pub/sub impl requires that you call .new_channel() before you call .receive(). This is because we "subscribe" to specific channels from inside .new_channel(). This works fine in production (given how Django Channels uses the API) but many of the tests in test_core do not do this.

Another issue is that Redis Pub/Sub does not have a unique keyspace per databases, thus the trick that test_core uses for multi-database testing will not work here. See this documentation, section "Database & Scoping" which says:

Pub/Sub has no relation to the key space. It was made to not interfere with it on any level, including database numbers. ... Publishing on db 10, will be heard by a subscriber on db 1.

There may be other issues, but I stopped at this point. How about I write a new test suite (tests/test_pubsub.py) to test this new impl and ensure it implements the API correctly (now and in the future)?

@carltongibson
Copy link
Member

How about I write a new test suite (tests/test_pubsub.py) to test this new impl and ensure it implements the API correctly (now and in the future)?

OK, that sounds perfect. (The thought to parametrise was just about saving a bit of work, but yes, it’s not at the right level for that.)

Please, yes, perhaps create fresh PR, as there’s a lot of noise here now — I think you merged, rather than rebased at the end there — and a fresh one-or-two-commit PR will be easier to review.

Thanks! (This is quite exciting 😄)

@acu192
Copy link
Collaborator Author

acu192 commented May 13, 2021

Here is a link to the new (cleaned-up) PR for this feature: #251

@acu192
Copy link
Collaborator Author

acu192 commented Oct 24, 2021

@yedpodtrzitko - Thanks again for your review and feedback of this initial PR. In case you haven't been following the progress of the RedisPubSubChannelLayer since then ... with the 3.3.1 release (which fixed a few bugs), please give it a try now if you want.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants