Skip to content

Commit

Permalink
Fixed #32172 -- Adapted signals to allow async handlers.
Browse files Browse the repository at this point in the history
co-authored-by: kozzztik <[email protected]>
co-authored-by: Carlton Gibson <[email protected]>
  • Loading branch information
3 people authored and felixxm committed Mar 7, 2023
1 parent 9a07999 commit e83a885
Show file tree
Hide file tree
Showing 9 changed files with 370 additions and 43 deletions.
4 changes: 1 addition & 3 deletions django/core/handlers/asgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,7 @@ async def handle(self, scope, receive, send):
return
# Request is complete and can be served.
set_script_prefix(self.get_script_prefix(scope))
await sync_to_async(signals.request_started.send, thread_sensitive=True)(
sender=self.__class__, scope=scope
)
await signals.request_started.asend(sender=self.__class__, scope=scope)
# Get the request and check for basic issues.
request, error_response = self.create_request(scope, body_file)
if request is None:
Expand Down
233 changes: 209 additions & 24 deletions django/dispatch/dispatcher.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import asyncio
import logging
import threading
import weakref

from asgiref.sync import async_to_sync, iscoroutinefunction, sync_to_async

from django.utils.inspect import func_accepts_kwargs

logger = logging.getLogger("django.dispatch")
Expand Down Expand Up @@ -52,7 +55,8 @@ def connect(self, receiver, sender=None, weak=True, dispatch_uid=None):
receiver
A function or an instance method which is to receive signals.
Receivers must be hashable objects.
Receivers must be hashable objects. Receivers can be
asynchronous.
If weak is True, then receiver must be weak referenceable.
Expand Down Expand Up @@ -94,6 +98,8 @@ def connect(self, receiver, sender=None, weak=True, dispatch_uid=None):
else:
lookup_key = (_make_id(receiver), _make_id(sender))

is_async = iscoroutinefunction(receiver)

if weak:
ref = weakref.ref
receiver_object = receiver
Expand All @@ -106,8 +112,8 @@ def connect(self, receiver, sender=None, weak=True, dispatch_uid=None):

with self.lock:
self._clear_dead_receivers()
if not any(r_key == lookup_key for r_key, _ in self.receivers):
self.receivers.append((lookup_key, receiver))
if not any(r_key == lookup_key for r_key, _, _ in self.receivers):
self.receivers.append((lookup_key, receiver, is_async))
self.sender_receivers_cache.clear()

def disconnect(self, receiver=None, sender=None, dispatch_uid=None):
Expand Down Expand Up @@ -138,7 +144,7 @@ def disconnect(self, receiver=None, sender=None, dispatch_uid=None):
with self.lock:
self._clear_dead_receivers()
for index in range(len(self.receivers)):
(r_key, _) = self.receivers[index]
r_key, *_ = self.receivers[index]
if r_key == lookup_key:
disconnected = True
del self.receivers[index]
Expand All @@ -147,7 +153,8 @@ def disconnect(self, receiver=None, sender=None, dispatch_uid=None):
return disconnected

def has_listeners(self, sender=None):
return bool(self._live_receivers(sender))
sync_receivers, async_receivers = self._live_receivers(sender)
return bool(sync_receivers) or bool(async_receivers)

def send(self, sender, **named):
"""
Expand All @@ -157,6 +164,10 @@ def send(self, sender, **named):
terminating the dispatch loop. So it's possible that all receivers
won't be called if an error is raised.
If any receivers are asynchronous, they are called after all the
synchronous receivers via a single call to async_to_sync(). They are
also executed concurrently with asyncio.gather().
Arguments:
sender
Expand All @@ -172,16 +183,97 @@ def send(self, sender, **named):
or self.sender_receivers_cache.get(sender) is NO_RECEIVERS
):
return []
responses = []
sync_receivers, async_receivers = self._live_receivers(sender)
for receiver in sync_receivers:
response = receiver(signal=self, sender=sender, **named)
responses.append((receiver, response))
if async_receivers:

async def asend():
async_responses = await asyncio.gather(
*(
receiver(signal=self, sender=sender, **named)
for receiver in async_receivers
)
)
return zip(async_receivers, async_responses)

responses.extend(async_to_sync(asend)())
return responses

async def asend(self, sender, **named):
"""
Send signal from sender to all connected receivers in async mode.
All sync receivers will be wrapped by sync_to_async()
If any receiver raises an error, the error propagates back through
send, terminating the dispatch loop. So it's possible that all
receivers won't be called if an error is raised.
return [
(receiver, receiver(signal=self, sender=sender, **named))
for receiver in self._live_receivers(sender)
]
If any receivers are synchronous, they are grouped and called behind a
sync_to_async() adaption before executing any asynchronous receivers.
If any receivers are asynchronous, they are grouped and executed
concurrently with asyncio.gather().
Arguments:
sender
The sender of the signal. Either a specific object or None.
named
Named arguments which will be passed to receivers.
Return a list of tuple pairs [(receiver, response), ...].
"""
if (
not self.receivers
or self.sender_receivers_cache.get(sender) is NO_RECEIVERS
):
return []
sync_receivers, async_receivers = self._live_receivers(sender)
if sync_receivers:

@sync_to_async
def sync_send():
responses = []
for receiver in sync_receivers:
response = receiver(signal=self, sender=sender, **named)
responses.append((receiver, response))
return responses

else:
sync_send = list

responses, async_responses = await asyncio.gather(
sync_send(),
asyncio.gather(
*(
receiver(signal=self, sender=sender, **named)
for receiver in async_receivers
)
),
)
responses.extend(zip(async_receivers, async_responses))
return responses

def _log_robust_failure(self, receiver, err):
logger.error(
"Error calling %s in Signal.send_robust() (%s)",
receiver.__qualname__,
err,
exc_info=err,
)

def send_robust(self, sender, **named):
"""
Send signal from sender to all connected receivers catching errors.
If any receivers are asynchronous, they are called after all the
synchronous receivers via a single call to async_to_sync(). They are
also executed concurrently with asyncio.gather().
Arguments:
sender
Expand All @@ -206,19 +298,105 @@ def send_robust(self, sender, **named):
# Call each receiver with whatever arguments it can accept.
# Return a list of tuple pairs [(receiver, response), ... ].
responses = []
for receiver in self._live_receivers(sender):
sync_receivers, async_receivers = self._live_receivers(sender)
for receiver in sync_receivers:
try:
response = receiver(signal=self, sender=sender, **named)
except Exception as err:
logger.error(
"Error calling %s in Signal.send_robust() (%s)",
receiver.__qualname__,
err,
exc_info=err,
)
self._log_robust_failure(receiver, err)
responses.append((receiver, err))
else:
responses.append((receiver, response))
if async_receivers:

async def asend_and_wrap_exception(receiver):
try:
response = await receiver(signal=self, sender=sender, **named)
except Exception as err:
self._log_robust_failure(receiver, err)
return err
return response

async def asend():
async_responses = await asyncio.gather(
*(
asend_and_wrap_exception(receiver)
for receiver in async_receivers
)
)
return zip(async_receivers, async_responses)

responses.extend(async_to_sync(asend)())
return responses

async def asend_robust(self, sender, **named):
"""
Send signal from sender to all connected receivers catching errors.
If any receivers are synchronous, they are grouped and called behind a
sync_to_async() adaption before executing any asynchronous receivers.
If any receivers are asynchronous, they are grouped and executed
concurrently with asyncio.gather.
Arguments:
sender
The sender of the signal. Can be any Python object (normally one
registered with a connect if you actually want something to
occur).
named
Named arguments which will be passed to receivers.
Return a list of tuple pairs [(receiver, response), ... ].
If any receiver raises an error (specifically any subclass of
Exception), return the error instance as the result for that receiver.
"""
if (
not self.receivers
or self.sender_receivers_cache.get(sender) is NO_RECEIVERS
):
return []

# Call each receiver with whatever arguments it can accept.
# Return a list of tuple pairs [(receiver, response), ... ].
sync_receivers, async_receivers = self._live_receivers(sender)

if sync_receivers:

@sync_to_async
def sync_send():
responses = []
for receiver in sync_receivers:
try:
response = receiver(signal=self, sender=sender, **named)
except Exception as err:
self._log_robust_failure(receiver, err)
responses.append((receiver, err))
else:
responses.append((receiver, response))
return responses

else:
sync_send = list

async def asend_and_wrap_exception(receiver):
try:
response = await receiver(signal=self, sender=sender, **named)
except Exception as err:
self._log_robust_failure(receiver, err)
return err
return response

responses, async_responses = await asyncio.gather(
sync_send(),
asyncio.gather(
*(asend_and_wrap_exception(receiver) for receiver in async_receivers),
),
)
responses.extend(zip(async_receivers, async_responses))
return responses

def _clear_dead_receivers(self):
Expand All @@ -244,31 +422,38 @@ def _live_receivers(self, sender):
# We could end up here with NO_RECEIVERS even if we do check this case in
# .send() prior to calling _live_receivers() due to concurrent .send() call.
if receivers is NO_RECEIVERS:
return []
return [], []
if receivers is None:
with self.lock:
self._clear_dead_receivers()
senderkey = _make_id(sender)
receivers = []
for (receiverkey, r_senderkey), receiver in self.receivers:
for (_receiverkey, r_senderkey), receiver, is_async in self.receivers:
if r_senderkey == NONE_ID or r_senderkey == senderkey:
receivers.append(receiver)
receivers.append((receiver, is_async))
if self.use_caching:
if not receivers:
self.sender_receivers_cache[sender] = NO_RECEIVERS
else:
# Note, we must cache the weakref versions.
self.sender_receivers_cache[sender] = receivers
non_weak_receivers = []
for receiver in receivers:
non_weak_sync_receivers = []
non_weak_async_receivers = []
for receiver, is_async in receivers:
if isinstance(receiver, weakref.ReferenceType):
# Dereference the weak reference.
receiver = receiver()
if receiver is not None:
non_weak_receivers.append(receiver)
if is_async:
non_weak_async_receivers.append(receiver)
else:
non_weak_sync_receivers.append(receiver)
else:
non_weak_receivers.append(receiver)
return non_weak_receivers
if is_async:
non_weak_async_receivers.append(receiver)
else:
non_weak_sync_receivers.append(receiver)
return non_weak_sync_receivers, non_weak_async_receivers

def _remove_receiver(self, receiver=None):
# Mark that the self.receivers list has dead weakrefs. If so, we will
Expand Down
4 changes: 1 addition & 3 deletions django/test/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,7 @@ async def __call__(self, scope):
body_file = FakePayload("")

request_started.disconnect(close_old_connections)
await sync_to_async(request_started.send, thread_sensitive=False)(
sender=self.__class__, scope=scope
)
await request_started.asend(sender=self.__class__, scope=scope)
request_started.connect(close_old_connections)
# Wrap FakePayload body_file to allow large read() in test environment.
request = ASGIRequest(scope, LimitedStream(body_file, len(body_file)))
Expand Down
2 changes: 1 addition & 1 deletion django/test/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def complex_setting_changed(*, enter, setting, **kwargs):
# this stacklevel shows the line containing the override_settings call.
warnings.warn(
f"Overriding setting {setting} can lead to unexpected behavior.",
stacklevel=6,
stacklevel=5,
)


Expand Down
4 changes: 3 additions & 1 deletion docs/releases/5.0.txt
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,9 @@ Serialization
Signals
~~~~~~~

* ...
* The new :meth:`.Signal.asend` and :meth:`.Signal.asend_robust` methods allow
asynchronous signal dispatch. Signal receivers may be synchronous or
asynchronous, and will be automatically adapted to the correct calling style.

Templates
~~~~~~~~~
Expand Down
2 changes: 2 additions & 0 deletions docs/topics/async.txt
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ synchronous function and call it using :func:`sync_to_async`.

Asynchronous model and related manager interfaces were added.

.. _async_performance:

Performance
-----------

Expand Down
Loading

0 comments on commit e83a885

Please sign in to comment.