Skip to content

Commit

Permalink
Respect channel layer capacity in RedisChannelLayer.receive_buffer (#…
Browse files Browse the repository at this point in the history
…219)

respect the capacity setting so that the receive_buffer does not grow
without bounds

see: django/channels_redis#212
  • Loading branch information
dudleyhunt86 committed Sep 3, 2020
1 parent 854f91e commit a8e9ce2
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 2 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
UNRELEASED
----------

* Ensured per-channel queues are bounded in size to avoid a slow memory leak
if consumers stop reading.

3.0.1 (2020-07-15)
------------------

Expand Down
24 changes: 22 additions & 2 deletions channels_redis/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import base64
import binascii
import collections
import functools
import hashlib
import itertools
import logging
Expand Down Expand Up @@ -179,6 +180,19 @@ class UnsupportedRedis(Exception):
pass


class BoundedQueue(asyncio.Queue):
def put_nowait(self, item):
if self.full():
# see: https://github.com/django/channels_redis/issues/212
# if we actually get into this code block, it likely means that
# this specific consumer has stopped reading
# if we get into this code block, it's better to drop messages
# that exceed the channel layer capacity than to continue to
# malloc() forever
self.get_nowait()
return super(BoundedQueue, self).put_nowait(item)


class RedisChannelLayer(BaseChannelLayer):
"""
Redis channel layer.
Expand Down Expand Up @@ -226,7 +240,9 @@ def __init__(
# Event loop they are trying to receive on
self.receive_event_loop = None
# Buffered messages by process-local channel name
self.receive_buffer = collections.defaultdict(asyncio.Queue)
self.receive_buffer = collections.defaultdict(
functools.partial(BoundedQueue, self.capacity)
)
# Detached channel cleanup tasks
self.receive_cleaners = []
# Per-channel cleanup locks to prevent a receive starting and moving
Expand Down Expand Up @@ -544,7 +560,11 @@ async def new_channel(self, prefix="specific"):
Returns a new channel name that can be used by something in our
process as a specific channel.
"""
return "%s.%s!%s" % (prefix, self.client_prefix, uuid.uuid4().hex,)
return "%s.%s!%s" % (
prefix,
self.client_prefix,
uuid.uuid4().hex,
)

### Flush extension ###

Expand Down
14 changes: 14 additions & 0 deletions tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -627,3 +627,17 @@ def test_custom_group_key_format():
channel_layer = RedisChannelLayer(prefix="test_prefix")
group_name = channel_layer._group_key("test_group")
assert group_name == b"test_prefix:group:test_group"


def test_receive_buffer_respects_capacity():
channel_layer = RedisChannelLayer()
buff = channel_layer.receive_buffer["test-group"]
for i in range(10000):
buff.put_nowait(i)

capacity = 100
assert channel_layer.capacity == capacity
assert buff.full() is True
assert buff.qsize() == capacity
messages = [buff.get_nowait() for _ in range(capacity)]
assert list(range(9900, 10000)) == messages

0 comments on commit a8e9ce2

Please sign in to comment.