Skip to content

Commit

Permalink
implement a TTL for RedisChannelLayer.receive_buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanpetrello committed Jul 27, 2020
1 parent 2075071 commit 44ba516
Showing 1 changed file with 37 additions and 1 deletion.
38 changes: 37 additions & 1 deletion channels_redis/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,41 @@ class UnsupportedRedis(Exception):
pass


class ExpiringCache(collections.defaultdict):

def __init__(self, default, ttl=60, *args, **kw):
collections.defaultdict.__init__(self, default)
self._expires = collections.OrderedDict()
self.ttl = ttl

def __setitem__(self, k, v):
collections.defaultdict.__setitem__(self, k, v)
self._expires[k] = time.time() + self.ttl

def __delitem__(self, k):
try:
collections.defaultdict.__delitem__(self, k)
except KeyError:
pass

def expire(self):
expired = []
for k in self._expires.keys():
if self._expires[k] < time.time():
expired.append(k)
else:
# as this is an OrderedDict, every key after this
# was inserted *later*, so if _this_ key is *not* expired,
# the ones after it aren't either (so we can stop iterating)
break
for k in expired:
del self._expires[k]
try:
del self[k]
except KeyError:
pass


class RedisChannelLayer(BaseChannelLayer):
"""
Redis channel layer.
Expand Down Expand Up @@ -226,7 +261,7 @@ def __init__(
# Event loop they are trying to receive on
self.receive_event_loop = None
# Buffered messages by process-local channel name
self.receive_buffer = collections.defaultdict(asyncio.Queue)
self.receive_buffer = ExpiringCache(asyncio.Queue, ttl=self.expiry)
# Detached channel cleanup tasks
self.receive_cleaners = []
# Per-channel cleanup locks to prevent a receive starting and moving
Expand Down Expand Up @@ -616,6 +651,7 @@ async def group_discard(self, group, channel):
key = self._group_key(group)
async with self.connection(self.consistent_hash(group)) as connection:
await connection.zrem(key, channel)
self.receive_buffer.expire()

async def group_send(self, group, message):
"""
Expand Down

0 comments on commit 44ba516

Please sign in to comment.