diff --git a/src/tribler-core/tribler_core/notifier.py b/src/tribler-core/tribler_core/notifier.py index cdefd7fa902..1afc67b4da0 100644 --- a/src/tribler-core/tribler_core/notifier.py +++ b/src/tribler-core/tribler_core/notifier.py @@ -7,7 +7,11 @@ class Notifier: def __init__(self): self.logger = logging.getLogger(self.__class__.__name__) - self.observers: Dict[str, set] = defaultdict(set) + + # we use type Dict for `self.observers` for providing the deterministic order of callbacks + # Therefore `value: bool` here is unnecessary, and it just newer use. + self.observers: Dict[str, Dict[Callable, bool]] = defaultdict(dict) + # @ichorid: # We have to note the event loop reference, because when we call "notify" from an external thread, # we don't know anything about the existence of the event loop, and get_event_loop() can't find @@ -17,26 +21,34 @@ def __init__(self): self._loop = get_event_loop() def add_observer(self, topic: str, callback: Callable): + """ Add the observer for the topic. + Order of the added callbacks will be the same order for the calling the callbacks. + `add_observer` doesn't support duplicated callbacks. + """ self.logger.debug(f"Add observer topic {topic}") - self.observers[topic].add(callback) + self.observers[topic][callback] = True def remove_observer(self, topic: str, callback: Callable): + """ Remove the observer from the topic. In the case of a missed callback no error will be raised. + """ self.logger.debug(f"Remove observer topic {topic}") - self.observers[topic].discard(callback) + self.observers[topic].pop(callback, None) def notify(self, topic: str, *args, **kwargs): - def _notify(_topic, _kwargs, *_args): - for callback in self.observers[_topic]: - try: - callback(*_args, **_kwargs) - except Exception as _e: # pylint: disable=broad-except - self.logger.exception(_e) + """ Notify all observers about the topic. + Each call of observer's callback is isolated and an exception that could + occur in this call will not affect all other calls. + """ try: - # @ichorid: - # We have to call the notifier callbacks through call_soon_threadsafe - # because the notify method could have been called from a non-reactor thread - self._loop.call_soon_threadsafe(_notify, topic, kwargs, *args) + for callback in list(self.observers[topic]): + def _notify(_callback): + _callback(*args, **kwargs) + + # @ichorid: + # We have to call the notifier callbacks through call_soon_threadsafe + # because the notify method could have been called from a non-reactor thread + self._loop.call_soon_threadsafe(_notify, callback) except RuntimeError as e: # Raises RuntimeError if called on a loop that’s been closed. # This can happen on a secondary thread when the main application is shutting down. diff --git a/src/tribler-core/tribler_core/tests/test_notifier.py b/src/tribler-core/tribler_core/tests/test_notifier.py index 859e3c785bd..449249ad839 100644 --- a/src/tribler-core/tribler_core/tests/test_notifier.py +++ b/src/tribler-core/tribler_core/tests/test_notifier.py @@ -5,7 +5,6 @@ from tribler_core.notifier import Notifier - # pylint: disable=redefined-outer-name, protected-access @pytest.fixture @@ -52,14 +51,17 @@ async def test_notifier_remove_nonexistent_observer(notifier: Notifier): @pytest.mark.asyncio async def test_notifier_remove_observer(notifier: Notifier): - def callback(): + def callback1(): ... - notifier.add_observer('topic', lambda: None) - notifier.add_observer('topic', callback) + def callback2(): + ... - notifier.remove_observer('topic', callback) - assert len(notifier.observers['topic']) == 1 + notifier.add_observer('topic', callback1) + notifier.add_observer('topic', callback2) + + notifier.remove_observer('topic', callback1) + assert notifier.observers['topic'] == {callback2: True} @pytest.mark.timeout(1) @@ -87,6 +89,7 @@ async def test_notify_with_exception(notifier: Notifier): notifier.add_observer('topic', side_effect_callback.callback) notifier.add_observer('topic', normal_callback.callback) + notifier.add_observer('topic', side_effect_callback.callback) notifier.notify('topic') @@ -102,6 +105,7 @@ async def test_notify_call_soon_threadsafe_with_exception(notifier: Notifier): notifier.logger = Mock() notifier._loop = Mock(call_soon_threadsafe=Mock(side_effect=RuntimeError)) + notifier.add_observer('topic', lambda: ...) notifier.notify('topic') notifier.logger.warning.assert_called_once()