diff --git a/CHANGELOG.txt b/CHANGELOG.txt index 03921fa..e571977 100644 --- a/CHANGELOG.txt +++ b/CHANGELOG.txt @@ -1,3 +1,9 @@ +UNRELEASED +---------- + +* Ensured per-channel queues are bounded in size to avoid a slow memory leak + if consumers stop reading. + 3.0.1 (2020-07-15) ------------------ diff --git a/channels_redis/core.py b/channels_redis/core.py index cde7962..84cec2f 100644 --- a/channels_redis/core.py +++ b/channels_redis/core.py @@ -2,6 +2,7 @@ import base64 import binascii import collections +import functools import hashlib import itertools import logging @@ -179,6 +180,19 @@ class UnsupportedRedis(Exception): pass +class BoundedQueue(asyncio.Queue): + def put_nowait(self, item): + if self.full(): + # see: https://github.com/django/channels_redis/issues/212 + # if we actually get into this code block, it likely means that + # this specific consumer has stopped reading + # if we get into this code block, it's better to drop messages + # that exceed the channel layer capacity than to continue to + # malloc() forever + self.get_nowait() + return super(BoundedQueue, self).put_nowait(item) + + class RedisChannelLayer(BaseChannelLayer): """ Redis channel layer. @@ -226,7 +240,9 @@ def __init__( # Event loop they are trying to receive on self.receive_event_loop = None # Buffered messages by process-local channel name - self.receive_buffer = collections.defaultdict(asyncio.Queue) + self.receive_buffer = collections.defaultdict( + functools.partial(BoundedQueue, self.capacity) + ) # Detached channel cleanup tasks self.receive_cleaners = [] # Per-channel cleanup locks to prevent a receive starting and moving @@ -544,7 +560,11 @@ async def new_channel(self, prefix="specific"): Returns a new channel name that can be used by something in our process as a specific channel. """ - return "%s.%s!%s" % (prefix, self.client_prefix, uuid.uuid4().hex,) + return "%s.%s!%s" % ( + prefix, + self.client_prefix, + uuid.uuid4().hex, + ) ### Flush extension ### diff --git a/tests/test_core.py b/tests/test_core.py index 29a4f07..9b3b097 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -627,3 +627,17 @@ def test_custom_group_key_format(): channel_layer = RedisChannelLayer(prefix="test_prefix") group_name = channel_layer._group_key("test_group") assert group_name == b"test_prefix:group:test_group" + + +def test_receive_buffer_respects_capacity(): + channel_layer = RedisChannelLayer() + buff = channel_layer.receive_buffer["test-group"] + for i in range(10000): + buff.put_nowait(i) + + capacity = 100 + assert channel_layer.capacity == capacity + assert buff.full() is True + assert buff.qsize() == capacity + messages = [buff.get_nowait() for _ in range(capacity)] + assert list(range(9900, 10000)) == messages