diff --git a/awx/main/consumers.py b/awx/main/consumers.py index b6d8872ebdc1..d32219b3ac60 100644 --- a/awx/main/consumers.py +++ b/awx/main/consumers.py @@ -1,3 +1,5 @@ +import collections +import functools import json import logging import time @@ -12,12 +14,40 @@ from channels.generic.websocket import AsyncJsonWebsocketConsumer from channels.layers import get_channel_layer from channels.db import database_sync_to_async +from channels_redis.core import RedisChannelLayer logger = logging.getLogger('awx.main.consumers') XRF_KEY = '_auth_user_xrf' +class BoundedQueue(asyncio.Queue): + + def put_nowait(self, item): + if self.full(): + # dispose the oldest item + # if we actually get into this code block, it likely means that + # this specific consumer has stopped reading + # unfortunately, channels_redis will just happily continue to + # queue messages specific to their channel until the heat death + # of the sun: https://github.com/django/channels_redis/issues/212 + # this isn't a huge deal for browser clients that disconnect, + # but it *does* cause a problem for our global broadcast topic + # that's used to broadcast messages to peers in a cluster + # if we get into this code block, it's better to drop messages + # than to continue to malloc() forever + self.get_nowait() + return super(BoundedQueue, self).put_nowait(item) + + +class ExpiringRedisChannelLayer(RedisChannelLayer): + def __init__(self, *args, **kw): + super(ExpiringRedisChannelLayer, self).__init__(*args, **kw) + self.receive_buffer = collections.defaultdict( + functools.partial(BoundedQueue, self.capacity) + ) + + class WebsocketSecretAuthHelper: """ Middlewareish for websockets to verify node websocket broadcast interconnect. diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 99a392db0c8b..deb94693f545 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -897,7 +897,7 @@ def IS_TESTING(argv=None): CHANNEL_LAYERS = { "default": { - "BACKEND": "channels_redis.core.RedisChannelLayer", + "BACKEND": "awx.main.consumers.ExpiringRedisChannelLayer", "CONFIG": { "hosts": [BROKER_URL], "capacity": 10000,