Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide an alternative channel layer implementation that uses Redis Pub/Sub #247

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
60841ea
Alternate channel layer implementation that uses Redis Pub/Sub
acu192 Apr 28, 2021
41f7041
Note in README that there is an alternate implementation
acu192 Apr 28, 2021
0212542
Fix name of main branch in github action
acu192 Apr 28, 2021
633b76a
Ran the `black` command line code formatter
acu192 Apr 28, 2021
43c96b8
Ran the `isort` command line tool to sort imports
acu192 Apr 28, 2021
da820ae
Runtime documentation of a limitation of this implementation
acu192 Apr 29, 2021
e0b24b9
Address @yedpodtrzitko's comments
acu192 May 9, 2021
d008446
Use a logger
acu192 May 9, 2021
436b472
Reformatted by running `black`
acu192 May 9, 2021
0bba3ae
The newer version of pytest-asyncio breaks the tests
acu192 May 9, 2021
3656d87
Added Redis sentinel support. (#250)
carltongibson May 12, 2021
5dd301e
Alternate channel layer implementation that uses Redis Pub/Sub
acu192 Apr 28, 2021
e255859
Note in README that there is an alternate implementation
acu192 Apr 28, 2021
4fef58d
Fix name of main branch in github action
acu192 Apr 28, 2021
9de8911
Ran the `black` command line code formatter
acu192 Apr 28, 2021
7d929f6
Ran the `isort` command line tool to sort imports
acu192 Apr 28, 2021
91f81a3
Runtime documentation of a limitation of this implementation
acu192 Apr 29, 2021
cea3aef
Address @yedpodtrzitko's comments
acu192 May 9, 2021
341cd41
Use a logger
acu192 May 9, 2021
f7fa8e5
Reformatted by running `black`
acu192 May 9, 2021
a832772
Merge branch 'main' of https://github.com/acu192/channels_redis into …
acu192 May 12, 2021
1deb608
Rename alt.py to pubsub.py
acu192 May 12, 2021
8d0fad2
Support the "flush" extension
acu192 May 12, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: Tests
on:
push:
branches:
- master
- main
pull_request:

jobs:
Expand All @@ -18,7 +18,6 @@ jobs:
- 3.7
- 3.8
- 3.9

services:
redis:
image: redis
Expand All @@ -29,6 +28,19 @@ jobs:
--health-interval 10s
--health-timeout 5s
--health-retries 5
sentinel:
image: bitnami/redis-sentinel
ports:
- 26379:26379
options: >-
--health-cmd "redis-cli -p 26379 ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
env:
REDIS_MASTER_HOST: redis
REDIS_MASTER_SET: sentinel
REDIS_SENTINEL_QUORUM: "1"

steps:
- uses: actions/checkout@v2
Expand Down
17 changes: 17 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,17 @@ Set up the channel layer in your Django settings file like so::
},
}

Or, you can use the alternate implementation which uses Redis Pub/Sub::

CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_redis.pubsub.RedisPubSubChannelLayer",
"CONFIG": {
"hosts": [("localhost", 6379)],
},
},
}

Possible options for ``CONFIG`` are listed below.

``hosts``
Expand All @@ -40,6 +51,12 @@ The server(s) to connect to, as either URIs, ``(host, port)`` tuples, or dicts c
Defaults to ``['localhost', 6379]``. Pass multiple hosts to enable sharding,
but note that changing the host list will lose some sharded data.

Sentinel connections require dicts conforming to `create_sentinel
<https://aioredis.readthedocs.io/en/v1.3.0/sentinel.html#aioredis.sentinel.
create_sentinel>` with an additional `master_name` key specifying the Sentinel
master set. Plain Redis and Sentinel connections can be mixed and matched if
sharding.

If your server is listening on a UNIX domain socket, you can also use that to connect: ``["unix:///path/to/redis.sock"]``.
This should be slightly faster than a loopback TCP connection.

Expand Down
53 changes: 38 additions & 15 deletions channels_redis/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,15 @@ def _wrapper(self, *args, **kwargs):
class ConnectionPool:
"""
Connection pool manager for the channel layer.

It manages a set of connections for the given host specification and
taking into account asyncio event loops.
"""

def __init__(self, host):
self.host = host
self.host = host.copy()
self.master_name = self.host.pop("master_name", None)
self.conn_map = {}
self.sentinel_map = {}
self.in_use = {}

def _ensure_loop(self, loop):
Expand All @@ -68,16 +69,27 @@ def _ensure_loop(self, loop):

return self.conn_map[loop], loop

async def create_conn(self, loop):
# One connection per pool since we are emulating a single connection
kwargs = {"minsize": 1, "maxsize": 1, **self.host}
if not (sys.version_info >= (3, 8, 0) and AIOREDIS_VERSION >= (1, 3, 1)):
kwargs["loop"] = loop
if self.master_name is None:
return await aioredis.create_redis_pool(**kwargs)
else:
kwargs = {"timeout": 2, **kwargs} # aioredis default is way too low
sentinel = await aioredis.sentinel.create_sentinel(**kwargs)
conn = sentinel.master_for(self.master_name)
self.sentinel_map[conn] = sentinel
return conn

async def pop(self, loop=None):
"""
Get a connection for the given identifier and loop.
"""
conns, loop = self._ensure_loop(loop)
if not conns:
if sys.version_info >= (3, 8, 0) and AIOREDIS_VERSION >= (1, 3, 1):
conn = await aioredis.create_redis(**self.host)
else:
conn = await aioredis.create_redis(**self.host, loop=loop)
conn = await self.create_conn(loop)
conns.append(conn)
conn = conns.pop()
if conn.closed:
Expand All @@ -96,48 +108,58 @@ def push(self, conn):
conns, _ = self._ensure_loop(loop)
conns.append(conn)

def conn_error(self, conn):
async def conn_error(self, conn):
"""
Handle a connection that produced an error.
"""
conn.close()
await self._close_conn(conn)
del self.in_use[conn]

def reset(self):
"""
Clear all connections from the pool.
"""
self.conn_map = {}
self.sentinel_map = {}
self.in_use = {}

async def _close_conn(self, conn, sentinel_map=None):
if sentinel_map is None:
sentinel_map = self.sentinel_map
if conn in sentinel_map:
sentinel_map[conn].close()
await sentinel_map[conn].wait_closed()
del sentinel_map[conn]
conn.close()
await conn.wait_closed()

async def close_loop(self, loop):
"""
Close all connections owned by the pool on the given loop.
"""
if loop in self.conn_map:
for conn in self.conn_map[loop]:
conn.close()
await conn.wait_closed()
await self._close_conn(conn)
del self.conn_map[loop]

for k, v in self.in_use.items():
if v is loop:
await self._close_conn(k)
self.in_use[k] = None

async def close(self):
"""
Close all connections owned by the pool.
"""
conn_map = self.conn_map
sentinel_map = self.sentinel_map
in_use = self.in_use
self.reset()
for conns in conn_map.values():
for conn in conns:
conn.close()
await conn.wait_closed()
await self._close_conn(conn, sentinel_map)
for conn in in_use:
conn.close()
await conn.wait_closed()
await self._close_conn(conn, sentinel_map)


class ChannelLock:
Expand Down Expand Up @@ -262,6 +284,7 @@ def decode_hosts(self, hosts):
raise ValueError(
"You must pass a list of Redis hosts, even if there is only one."
)

# Decode each hosts entry into a kwargs dict
result = []
for entry in hosts:
Expand Down Expand Up @@ -888,7 +911,7 @@ async def __aenter__(self):

async def __aexit__(self, exc_type, exc, tb):
if exc:
self.pool.conn_error(self.conn)
await self.pool.conn_error(self.conn)
else:
self.pool.push(self.conn)
self.conn = None
Loading