Skip to content

Commit

Permalink
implement a TTL for RedisChannelLayer.receive_buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanpetrello committed Jul 27, 2020
1 parent 2075071 commit 3cfddc9
Showing 1 changed file with 35 additions and 2 deletions.
37 changes: 35 additions & 2 deletions channels_redis/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,35 @@ class UnsupportedRedis(Exception):
pass


class ExpiringCache(collections.defaultdict):

def __init__(self, default, ttl=60, *args, **kw):
collections.defaultdict.__init__(self, default)
self._expires = collections.OrderedDict()
self.ttl = ttl

def __setitem__(self, k, v):
collections.defaultdict.__setitem__(self, k, v)
self._expires[k] = time.time() + self.ttl

def expire(self):
expired = []
for k in self._expires.keys():
if self._expires[k] < time.time():
expired.append(k)
else:
# as this is an OrderedDict, every key after this
# was inserted *later*, so if _this_ key is *not* expired,
# the ones after it aren't either (so we can stop iterating)
break
for k in expired:
del self._expires[k]
try:
del self[k]
except KeyError:
pass


class RedisChannelLayer(BaseChannelLayer):
"""
Redis channel layer.
Expand Down Expand Up @@ -226,7 +255,7 @@ def __init__(
# Event loop they are trying to receive on
self.receive_event_loop = None
# Buffered messages by process-local channel name
self.receive_buffer = collections.defaultdict(asyncio.Queue)
self.receive_buffer = ExpiringCache(asyncio.Queue, ttl=self.expiry)
# Detached channel cleanup tasks
self.receive_cleaners = []
# Per-channel cleanup locks to prevent a receive starting and moving
Expand Down Expand Up @@ -459,7 +488,10 @@ async def receive(self, channel):
self.receive_buffer[message_channel].put_nowait(message)
message = None
except Exception:
del self.receive_buffer[channel]
try:
del self.receive_buffer[channel]
except KeyError:
pass
raise
finally:
self.receive_lock.release()
Expand Down Expand Up @@ -616,6 +648,7 @@ async def group_discard(self, group, channel):
key = self._group_key(group)
async with self.connection(self.consistent_hash(group)) as connection:
await connection.zrem(key, channel)
self.receive_buffer.expire()

async def group_send(self, group, message):
"""
Expand Down

0 comments on commit 3cfddc9

Please sign in to comment.