-
Notifications
You must be signed in to change notification settings - Fork 452
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
14 changed files
with
166 additions
and
103 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,53 +1,44 @@ | ||
""" | ||
Notifier. | ||
Author(s): Vadim Bulavintsev | ||
""" | ||
import logging | ||
from asyncio import get_event_loop | ||
|
||
from tribler_common.simpledefs import NTFY | ||
from collections import defaultdict | ||
from typing import Callable, Dict | ||
|
||
|
||
class Notifier: | ||
|
||
def __init__(self): | ||
self._logger = logging.getLogger(self.__class__.__name__) | ||
self.observers = {} | ||
self.logger = logging.getLogger(self.__class__.__name__) | ||
self.observers: Dict[str, set] = defaultdict(set) | ||
# @ichorid: | ||
# We have to note the event loop reference, because when we call "notify" from an external thread, | ||
# we don't know anything about the existence of the event loop, and get_event_loop() can't find | ||
# the original event loop from an external thread. | ||
self._loop = get_event_loop() | ||
# We remember the event loop from the thread that runs the Notifier | ||
# to be able to schedule notifications from external threads | ||
self._loop = get_event_loop() | ||
|
||
def add_observer(self, subject, callback): | ||
assert isinstance(subject, NTFY) | ||
self.observers[subject] = self.observers.get(subject, []) | ||
self.observers[subject].append(callback) | ||
self._logger.debug(f"Add observer topic {subject} callback {callback}") | ||
|
||
def remove_observer(self, subject, callback): | ||
if subject not in self.observers: | ||
return | ||
if callback not in self.observers[subject]: | ||
return | ||
|
||
self.observers[subject].remove(callback) | ||
self._logger.debug(f"Remove observer topic {subject} callback {callback}") | ||
|
||
def notify(self, subject, *args): | ||
# We have to call the notifier callbacks through call_soon_threadsafe | ||
# because the notify method could have been called from a non-reactor thread | ||
self._loop.call_soon_threadsafe(self._notify, subject, *args) | ||
|
||
def _notify(self, subject, *args): | ||
if subject not in self.observers: | ||
self._logger.warning(f"Called notification on a non-existing subject {subject}") | ||
return | ||
for callback in self.observers[subject]: | ||
callback(*args) | ||
|
||
def notify_shutdown_state(self, state): | ||
self._logger.info("Tribler shutdown state notification:%s", state) | ||
self.notify(NTFY.TRIBLER_SHUTDOWN_STATE, state) | ||
def add_observer(self, topic: str, callback: Callable): | ||
self.logger.debug(f"Add observer topic {topic}") | ||
self.observers[topic].add(callback) | ||
|
||
def remove_observer(self, topic: str, callback: Callable): | ||
self.logger.debug(f"Remove observer topic {topic}") | ||
self.observers[topic].discard(callback) | ||
|
||
def notify(self, topic: str, *args, **kwargs): | ||
def _notify(_topic, _kwargs, *_args): | ||
for callback in self.observers[_topic]: | ||
try: | ||
callback(*_args, **_kwargs) | ||
except Exception as _e: # pylint: disable=broad-except | ||
self.logger.exception(_e) | ||
|
||
try: | ||
# @ichorid: | ||
# We have to call the notifier callbacks through call_soon_threadsafe | ||
# because the notify method could have been called from a non-reactor thread | ||
self._loop.call_soon_threadsafe(_notify, topic, kwargs, *args) | ||
except RuntimeError as e: | ||
# Raises RuntimeError if called on a loop that’s been closed. | ||
# This can happen on a secondary thread when the main application is shutting down. | ||
# https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.call_soon_threadsafe | ||
self.logger.warning(e) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.