Skip to content

Commit

Permalink
Statically typed notifier
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovsky committed Jan 17, 2022
1 parent f1de8bd commit 68666cb
Show file tree
Hide file tree
Showing 41 changed files with 905 additions and 467 deletions.
83 changes: 83 additions & 0 deletions src/tribler-common/tribler_common/notifications.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from ipv8.messaging.anonymization.tunnel import Circuit


# pylint: disable=unused-argument


def torrent_finished(infohash: str, name: str, hidden: bool):
# A torrent has finished downloading. Contains the infohash and the name of the torrent
...


def tribler_shutdown_state(state: str):
# Tribler is going to shutdown
...


def tribler_new_version(version: str):
# A new version of Tribler is available
...


def channel_discovered(data: dict):
# Tribler has discovered a new channel. Contains the channel data
...


def remote_query_results(data: dict):
# Remote GigaChannel search results were received by Tribler. Contains received entries
...


def circuit_removed(circuit: Circuit, additional_info: str):
# Tribler tunnel circuit has been removed (notification to Core)
...


def tunnel_removed(circuit_id: int, bytes_up: int, bytes_down: int, uptime: float, additional_info: str = ''):
# Tribler tunnel circuit has been removed (notification to GUI)
...


def watch_folder_corrupt_file(file_name: str):
# A corrupt .torrent file in the watch folder is found. Contains the name of the corrupt torrent file
...


def channel_entity_updated(channel_update_dict: dict):
# Information about some torrent has been updated (e.g. health). Contains updated torrent data
...


def low_space(disk_usage_data: dict):
# Tribler is low on disk space for storing torrents
...


def events_start(public_key: str, version: str):
...


def tribler_exception(error: dict):
...


def popularity_community_unknown_torrent_added():
...


def report_config_error(error):
# Report config error on startup
...


def peer_disconnected(peer_id: bytes):
...


def tribler_torrent_peer_update(peer_id: bytes, infohash: bytes, balance: int):
...


def torrent_metadata_added(metadata: dict):
...
167 changes: 167 additions & 0 deletions src/tribler-common/tribler_common/notifier.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
import logging
from asyncio import AbstractEventLoop
from collections import defaultdict
from contextlib import contextmanager
from inspect import iscoroutinefunction, signature
from threading import Lock
from typing import Callable, Dict, TypeVar, cast


FuncT = TypeVar("FuncT", bound=Callable[..., None])


class NotifierError(Exception):
pass


class Notifier:
def __init__(self, loop: AbstractEventLoop = None):
self.lock = Lock()
self.logger = logging.getLogger(self.__class__.__name__)

self.topics_by_name: Dict[str, Callable] = {}
# We use the dict type for `self.observers` and `set.generic_observers` instead of the set type to provide
# the deterministic ordering of callbacks. In Python, dictionaries are ordered while sets aren't.
# Therefore, `value: bool` here is unnecessary and is never used.
self.topics: Dict[Callable, Dict[Callable, bool]] = defaultdict(dict)
self.generic_observers: Dict[Callable, bool] = {}
self.interceptors: Dict[Callable, bool] = {}

# @ichorid:
# We have to store the event loop in constructor. Otherwise, get_event_loop() cannot find
# the original event loop when scheduling notifications from the external thread.
self.loop = loop

def add_observer(self, topic: FuncT, observer: FuncT):
""" Add the observer for the topic.
Each callback will be added no more than once. Callbacks are called in the same order as they were added.
"""
topic_signature = signature(topic)
callback_signature = signature(observer)
if topic_signature != callback_signature:
raise TypeError(f'Cannot add observer {observer!r} to topic "{topic.__name__}": '
f'the callback signature {callback_signature} does not match '
f'the topic signature {topic_signature}')

if iscoroutinefunction(topic):
raise TypeError(f"Topic cannot be a coroutine function. Got: {topic!r}")

if iscoroutinefunction(observer):
raise TypeError(f"Observer cannot be a coroutine function. Got: {observer!r}")

if topic is observer:
raise TypeError(f"Topic and observer cannot be the same function. Got: {topic!r}")

self.logger.debug(f"Add observer topic {topic.__name__}")
with self.lock:
topic_name: str = topic.__name__
prev_topic = self.topics_by_name.setdefault(topic_name, topic)
if prev_topic is not topic:
raise NotifierError(f'Cannot register topic {topic!r} because topic name {topic_name} is already taken '
f'by another topic {prev_topic!r}')

self.topics[topic][observer] = True

def remove_observer(self, topic: FuncT, observer: FuncT):
""" Remove the observer from the topic. In the case of a missed callback no error will be raised.
"""
with self.lock:
observers = self.topics[topic]
observers.pop(observer, None)
comment = "" if not observers else f" (it still has {len(observers)} observers)"
self.logger.debug(f"Remove observer {observer!r} from topic {topic.__name__}" + comment)

def add_generic_observer(self, observer: Callable):
self.logger.debug(f"Add generic observer {observer!r}")
with self.lock:
self.generic_observers[observer] = True

def remove_generic_observer(self, observer: Callable):
with self.lock:
self.generic_observers.pop(observer, None)
self.logger.debug(f"Remove generic observer {observer!r}")

def add_interceptor(self, interceptor: Callable):
self.logger.debug(f"Add interceptor {interceptor!r}")
with self.lock:
self.interceptors[interceptor] = True

def remove_interceptor(self, observer: Callable):
with self.lock:
self.interceptors.pop(observer, None)
self.logger.debug(f"Remove interceptor {observer!r}")

def __getitem__(self, topic: FuncT) -> FuncT:
def wrapper(*args, **kwargs):
self.notify(topic, *args, **kwargs)
return cast(FuncT, wrapper)

def notify_by_topic_name(self, topic_name: str, *args, **kwargs):
with self.lock:
topic = self.topics_by_name.get(topic_name)
if topic is None:
self.logger.warning(f'Topic with name {topic_name} not found')
else:
self.notify(topic, *args, **kwargs)

def notify(self, topic: Callable, *args, **kwargs):
""" Notify all observers about the topic.
Сan be called from any thread. Observers will be called from the reactor thread during the next iteration
of the event loop. An exception when an observer is invoked will not affect other observers.
"""
self.logger.debug(f"Notification for topic {topic.__name__}")

topic(*args, **kwargs)

with self.lock:
interceptors = list(self.interceptors)
generic_observers = list(self.generic_observers)
observers = list(self.topics[topic])

suppress = False
for interceptor in interceptors:
if interceptor(topic, *args, **kwargs):
suppress = True
if suppress:
return

for observer in generic_observers:
# The topic is passed twice, the second topic is passed to the generic observer as the first argument
self._notify_threadsafe(topic, observer, topic, *args, **kwargs) # pylint: disable=arguments-out-of-order

for observer in observers:
self._notify_threadsafe(topic, observer, *args, **kwargs)

def _notify_threadsafe(self, topic: Callable, observer: Callable, *args, **kwargs):
if not self.loop:
self._notify(topic, observer, args, kwargs)
else:
try:
self.loop.call_soon_threadsafe(self._notify, topic, observer, args, kwargs)
except RuntimeError as e:
# Raises RuntimeError if called on a loop that’s been closed.
# This can happen on a secondary thread when the main application is shutting down.
# https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.call_soon_threadsafe
self.logger.warning(e)

def _notify(self, topic: Callable, observer: Callable, args: tuple, kwargs: dict):
self.logger.debug(f"Calling observer {observer!r} for topic {topic.__name__}")
try:
observer(*args, **kwargs)
except Exception as e: # pylint: disable=broad-except
self.logger.exception(e)

@contextmanager
def capture_notifications(self, *topics, suppress=True):
notifications = []

def interceptor(topic: Callable, *args, **kwargs):
if not topics or topic in topics:
notifications.append((topic, args, kwargs))
return suppress
return False

self.add_interceptor(interceptor)
yield notifications
self.remove_interceptor(interceptor)
Loading

0 comments on commit 68666cb

Please sign in to comment.