Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scalability #6

Open
mattbasta opened this issue Feb 12, 2020 · 5 comments · May be fixed by #7
Open

Scalability #6

mattbasta opened this issue Feb 12, 2020 · 5 comments · May be fixed by #7

Comments

@mattbasta
Copy link

Hi there! I started using this for a new project I'm working on and it perfectly suits my needs.

One thing I'm concerned about over time is that all events are broadcast over the same channel (subscriptions?). For the purposes of my app, which implements a chat-like interface over a single Message model, this would—to my understanding—require every message to be received and filtered by every instance of the application. That is, if there are 1000 users sending 10 messages per minute across 5 servers, each server would need to receive and process 10,000 events per minute (166 per second). Ideally, if each user is connected to one server via a websocket, and the recipient is connected to another server via a websocket, each server is only processing 4000 events per minute (1000 users spread over 5 servers == 200 users/server, with each user sending/receiving 20 messages == 4,000 per minute or ~66/second). Adding servers in this case would significantly decrease the number of events per server per minute.

I guess my first question is, have you considered anything to work around this? My initial thought is to create factories that return the equivalent of post_save_subscription or post_delete_subscription. So instead of

post_save.connect(post_save_subscription, sender=Message, dispatch_uid="message_post_save")
post_delete.connect(post_delete_subscription, sender=Message, dispatch_uid="message_post_delete")

you might write

def channel_map(event):
    return f'chat_room::{event.instance.room.id}'

post_save_subscription, post_delete_subscription = \
    get_signal_handlers(mapper=channel_map)

post_save.connect(post_save_subscription, sender=Message, dispatch_uid="message_post_save")
post_delete.connect(post_delete_subscription, sender=Message, dispatch_uid="message_post_delete")

However, it's unclear how one might group_add in the consumer when a client subscribes. I don't know much about the internals of the underlying graphql library (yet), so it's tough for me to imagine how it would be possible to, for instance, call root.join_channel(f'chat_room::{room_id}').filter(...).

Long story short, I'd be interested in contributing towards this if there was an obvious way to expose some notion of channels in the existing interface. I'd love to hear your thoughts on how this could be accomplished and whether you'd accept PRs for an implementation.

@jaydenwindle
Copy link
Owner

jaydenwindle commented Feb 13, 2020

Hey @mattbasta! Thanks for using graphene-subscriptions. I'm glad it fits your use case well :)

You're correct - right now all events are broadcast to a single Channel Layer group called subscriptions, which every client is added to when they establish a websocket connection. This is not ideal, since it renders horizontal scaling mostly ineffective. This is definitely low-hanging fruit for optimization.

I think part of the problem here is how graphene-subscriptions tries to abstract away the concept of groups and signals from the user. This makes it difficult to drop down to the group / signal layer and implement custom logic.

I'm actually considering a fairly significant re-write of this library which will remove Channel Layers and replace it with a pub/sub implementation based on aioredis. This will give users much more control over how subscriptions are fired at a lower level. With this new architecture, implementing a subscription would look something like this:

# your_app/graphql/subscriptions.py
import graphene
from graphene_subscriptions import PubSub

class Subscription(graphene.ObjectType):
    message_created = graphene.Field(MessageType, room=graphene.String())

    async def resolve_your_model_created(root, info, room):
        pubsub = PubSub()
        return await pubsub.subscribe(f"messageCreated.{room}")

# your_app/signals.py
from django.db.models.signals import post_save
from asgiref.sync import async_to_sync
from graphene_subscriptions import PubSub

def on_message_created(sender, instance, created, **kwargs):
    pubsub = PubSub()

    if created:
        async_to_sync(pubsub.publish)(
            f"messageCreated.{instance.room.id}",
            instance
        )
        
post_save.connect(on_message_created, sender=Message, dispatch_uid="message_created")

This should solve your scaling issue by giving you more control over how clients receive subscription events, and will also fix a number of other issues and anti-patterns that I've noticed while using this library for the last little while. This change will be dependent on graphene-django v3 (not released yet), which adds better support for async execution.

What do you think?

@jaydenwindle
Copy link
Owner

jaydenwindle commented Feb 14, 2020

@mattbasta After thinking a little more about this last night, I came up with a way to give you more control over client groups in a manner that works with the current library architecture.

With this method, a subscribe function will be passed to each Subscription resolver in the info.context dict. This function can be used to add the client to a group based on the resolver arguments. You could then define your own signal handler and manually fire off a ModelSubscriptionEvent. An extra parameter called group can be passed to ModelSubscriptionEvent, which will send the event to that group instead of the default subscriptions group.

Using this approach, you could accomplish what you're looking for by doing something like this:

# your_app/graphql/subscriptions.py
class MessageCreatedSubscription(graphene.ObjectType):
    message_created = graphene.Field(MessageType, room=graphene.ID())

    def resolve_message_created(root, info, room):
        info.context.subscribe(f"messageCreated.{room}")
        return root.map(lambda event: event.instance)

# your_app/signals.py
from django.db.models.signals import post_save
from graphene_subscriptions.events import ModelSubscriptionEvent, CREATED

def on_message_created(sender, instance, created, **kwargs):
    if created:
        event = ModelSubscriptionEvent(
            operation=CREATED,
            instance=instance,
            group=f"modelCreated.{instance.room.id}"
        )
        event.send()
        
post_save.connect(on_message_created, sender=Message, dispatch_uid="message_created")

This way, whenever a new message is sent, message events will only be processed by consumers whose clients are subscribed to new messages in that room. This should address the scalability issues while still maintaining backwards compatibility and allowing users to use the global subscriptions channel for simplicity's sake.

I've opened a PR to add this functionality (#7).

Let me know if this will address your scalability concerns until graphene-django v3 drops and I can finish the re-write :)

@mattbasta
Copy link
Author

That's awesome! I think it definitely addresses my concerns. I'll have a look through the PR either tonight or this weekend. Thanks so much for taking the time to work on this; please let me know if there's anything that you'd be looking for outside help with.

@jaydenwindle
Copy link
Owner

@mattbasta you're very welcome! Thanks for the feedback :)

If you'd like to help write some docs for this new approach, that would be much appreciated! I haven't quite nailed down what the new API will look like yet, so any input there would be awesome as well.

@jaydenwindle jaydenwindle mentioned this issue Feb 18, 2020
4 tasks
@mattbasta
Copy link
Author

Sorry for the very long delay in replying to this thread. Subscriptions took a back seat to some broader functionality in my app.

Looking at #7, it seems like it should be more than satisfy my use case. I like the mixin!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants