Skip to content

Commit

Permalink
Implement @kozlovsky's suggestions
Browse files Browse the repository at this point in the history
  • Loading branch information
drew2a committed Jan 5, 2022
1 parent 69c5dff commit 14a5e21
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 19 deletions.
38 changes: 25 additions & 13 deletions src/tribler-core/tribler_core/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
class Notifier:
def __init__(self):
self.logger = logging.getLogger(self.__class__.__name__)
self.observers: Dict[str, set] = defaultdict(set)

# we use type Dict for `self.observers` for providing the deterministic order of callbacks
# Therefore `value: bool` here is unnecessary, and it just newer use.
self.observers: Dict[str, Dict[Callable, bool]] = defaultdict(dict)

# @ichorid:
# We have to note the event loop reference, because when we call "notify" from an external thread,
# we don't know anything about the existence of the event loop, and get_event_loop() can't find
Expand All @@ -17,26 +21,34 @@ def __init__(self):
self._loop = get_event_loop()

def add_observer(self, topic: str, callback: Callable):
""" Add the observer for the topic.
Order of the added callbacks will be the same order for the calling the callbacks.
`add_observer` doesn't support duplicated callbacks.
"""
self.logger.debug(f"Add observer topic {topic}")
self.observers[topic].add(callback)
self.observers[topic][callback] = True

def remove_observer(self, topic: str, callback: Callable):
""" Remove the observer from the topic. In the case of a missed callback no error will be raised.
"""
self.logger.debug(f"Remove observer topic {topic}")
self.observers[topic].discard(callback)
self.observers[topic].pop(callback, None)

def notify(self, topic: str, *args, **kwargs):
def _notify(_topic, _kwargs, *_args):
for callback in self.observers[_topic]:
try:
callback(*_args, **_kwargs)
except Exception as _e: # pylint: disable=broad-except
self.logger.exception(_e)
""" Notify all observers about the topic.
Each call of observer's callback is isolated and an exception that could
occur in this call will not affect all other calls.
"""
try:
# @ichorid:
# We have to call the notifier callbacks through call_soon_threadsafe
# because the notify method could have been called from a non-reactor thread
self._loop.call_soon_threadsafe(_notify, topic, kwargs, *args)
for callback in list(self.observers[topic]):
def _notify(_callback):
_callback(*args, **kwargs)

# @ichorid:
# We have to call the notifier callbacks through call_soon_threadsafe
# because the notify method could have been called from a non-reactor thread
self._loop.call_soon_threadsafe(_notify, callback)
except RuntimeError as e:
# Raises RuntimeError if called on a loop that’s been closed.
# This can happen on a secondary thread when the main application is shutting down.
Expand Down
16 changes: 10 additions & 6 deletions src/tribler-core/tribler_core/tests/test_notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

from tribler_core.notifier import Notifier


# pylint: disable=redefined-outer-name, protected-access

@pytest.fixture
Expand Down Expand Up @@ -52,14 +51,17 @@ async def test_notifier_remove_nonexistent_observer(notifier: Notifier):

@pytest.mark.asyncio
async def test_notifier_remove_observer(notifier: Notifier):
def callback():
def callback1():
...

notifier.add_observer('topic', lambda: None)
notifier.add_observer('topic', callback)
def callback2():
...

notifier.remove_observer('topic', callback)
assert len(notifier.observers['topic']) == 1
notifier.add_observer('topic', callback1)
notifier.add_observer('topic', callback2)

notifier.remove_observer('topic', callback1)
assert notifier.observers['topic'] == {callback2: True}


@pytest.mark.timeout(1)
Expand Down Expand Up @@ -87,6 +89,7 @@ async def test_notify_with_exception(notifier: Notifier):

notifier.add_observer('topic', side_effect_callback.callback)
notifier.add_observer('topic', normal_callback.callback)
notifier.add_observer('topic', side_effect_callback.callback)

notifier.notify('topic')

Expand All @@ -102,6 +105,7 @@ async def test_notify_call_soon_threadsafe_with_exception(notifier: Notifier):
notifier.logger = Mock()
notifier._loop = Mock(call_soon_threadsafe=Mock(side_effect=RuntimeError))

notifier.add_observer('topic', lambda: ...)
notifier.notify('topic')

notifier.logger.warning.assert_called_once()

0 comments on commit 14a5e21

Please sign in to comment.