Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make notifier more generic #6702

Merged
merged 2 commits into from
Jan 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
from tribler_common.simpledefs import CHANNELS_VIEW_UUID, NTFY

from tribler_core.components.gigachannel.community.discovery_booster import DiscoveryBooster
from tribler_core.components.metadata_store.remote_query_community.payload_checker import ObjState
from tribler_core.components.metadata_store.db.serialization import CHANNEL_TORRENT
from tribler_core.components.metadata_store.utils import NoChannelSourcesException
from tribler_core.components.metadata_store.remote_query_community.payload_checker import ObjState
from tribler_core.components.metadata_store.remote_query_community.remote_query_community import RemoteQueryCommunity
from tribler_core.components.metadata_store.utils import NoChannelSourcesException
from tribler_core.utilities.unicode import hexlify

minimal_blob_size = 200
Expand Down Expand Up @@ -152,7 +152,8 @@ def on_packet_callback(_, processing_results):
)
]
if self.notifier and results:
self.notifier.notify(NTFY.CHANNEL_DISCOVERED, {"results": results, "uuid": str(CHANNELS_VIEW_UUID)})
self.notifier.notify(NTFY.CHANNEL_DISCOVERED.value,
{"results": results, "uuid": str(CHANNELS_VIEW_UUID)})

request_dict = {
"metadata_type": [CHANNEL_TORRENT],
Expand Down Expand Up @@ -211,7 +212,7 @@ def notify_gui(request, processing_results):
]
if self.notifier:
self.notifier.notify(
NTFY.REMOTE_QUERY_RESULTS,
NTFY.REMOTE_QUERY_RESULTS.value,
{"results": results, "uuid": str(request_uuid), "peer": hexlify(request.peer.mid)},
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ def _process_download():
updated_channel = self.mds.ChannelMetadata.get(public_key=channel.public_key, id_=channel.id_)
channel_dict = updated_channel.to_simple_dict() if updated_channel else None
if updated_channel:
self.notifier.notify(NTFY.CHANNEL_ENTITY_UPDATED, channel_dict)
self.notifier.notify(NTFY.CHANNEL_ENTITY_UPDATED.value, channel_dict)

def updated_my_channel(self, tdef):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@
from tribler_common.osutils import fix_filebasename
from tribler_common.simpledefs import DLSTATUS_SEEDING, DLSTATUS_STOPPED, DOWNLOAD, NTFY

from tribler_core.exceptions import SaveResumeDataError
from tribler_core.components.libtorrent.download_manager.download_config import DownloadConfig
from tribler_core.components.libtorrent.download_manager.download_state import DownloadState
from tribler_core.components.libtorrent.settings import DownloadDefaultsSettings
from tribler_core.components.libtorrent.download_manager.stream import Stream
from tribler_core.components.libtorrent.settings import DownloadDefaultsSettings
from tribler_core.components.libtorrent.torrentdef import TorrentDef, TorrentDefNoMetainfo
from tribler_core.notifier import Notifier
from tribler_core.components.libtorrent.utils.libtorrent_helper import libtorrent as lt
from tribler_core.utilities.path_util import Path
from tribler_core.components.libtorrent.utils.torrent_utils import check_handle, get_info_from_handle, require_handle
from tribler_core.exceptions import SaveResumeDataError
from tribler_core.notifier import Notifier
from tribler_core.utilities.path_util import Path
from tribler_core.utilities.unicode import ensure_unicode, hexlify
from tribler_core.utilities.utilities import bdecode_compat

Expand Down Expand Up @@ -391,7 +391,7 @@ def on_torrent_finished_alert(self, _):
self.checkpoint()
if self.get_state().get_total_transferred(DOWNLOAD) > 0 and self.stream is not None:
if self.notifier is not None:
self.notifier.notify(NTFY.TORRENT_FINISHED, self.tdef.get_infohash(),
self.notifier.notify(NTFY.TORRENT_FINISHED.value, self.tdef.get_infohash(),
self.tdef.get_name_as_unicode(), self.hidden or
self.config.get_channel_download())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ def __init__(self,
self.metainfo_cache = {} # Dictionary that maps infohashes to cached metainfo items

self.default_alert_mask = lt.alert.category_t.error_notification | lt.alert.category_t.status_notification | \
lt.alert.category_t.storage_notification | lt.alert.category_t.performance_warning | \
lt.alert.category_t.tracker_notification | lt.alert.category_t.debug_notification
lt.alert.category_t.storage_notification | lt.alert.category_t.performance_warning | \
lt.alert.category_t.tracker_notification | lt.alert.category_t.debug_notification
self.session_stats_callback = None
self.state_cb_count = 0

Expand Down Expand Up @@ -151,19 +151,22 @@ def initialize(self):

self.set_download_states_callback(self.sesscb_states_callback)

def notify_shutdown_state(self, state):
self.notifier.notify(NTFY.TRIBLER_SHUTDOWN_STATE.value, state)

async def shutdown(self, timeout=30):
if self.downloads:
self.notifier.notify_shutdown_state("Checkpointing Downloads...")
self.notify_shutdown_state("Checkpointing Downloads...")
await gather(*[download.stop() for download in self.downloads.values()], return_exceptions=True)
self.notifier.notify_shutdown_state("Shutting down Downloads...")
self.notify_shutdown_state("Shutting down Downloads...")
await gather(*[download.shutdown() for download in self.downloads.values()], return_exceptions=True)

self.notifier.notify_shutdown_state("Shutting down Libtorrent Manager...")
self.notify_shutdown_state("Shutting down Libtorrent Manager...")
# If libtorrent session has pending disk io, wait until timeout (default: 30 seconds) to let it finish.
# In between ask for session stats to check if state is clean for shutdown.
# In dummy mode, we immediately shut down the download manager.
while not self.dummy_mode and not self.is_shutdown_ready() and timeout >= 1:
self.notifier.notify_shutdown_state("Waiting for Libtorrent to finish...")
self.notify_shutdown_state("Waiting for Libtorrent to finish...")
self.post_session_stats()
timeout -= 1
await asyncio.sleep(1)
Expand Down Expand Up @@ -244,7 +247,7 @@ def create_session(self, hops=0, store_listen_port=True):
settings['force_proxy'] = True

# Anon listen port is never used anywhere, so we let Libtorrent set it
#settings["listen_interfaces"] = "0.0.0.0:%d" % anon_port
# settings["listen_interfaces"] = "0.0.0.0:%d" % anon_port

# By default block all IPs except 1.1.1.1 (which is used to ensure libtorrent makes a connection to us)
self.update_ip_filter(ltsession, ['1.1.1.1'])
Expand All @@ -255,7 +258,7 @@ def create_session(self, hops=0, store_listen_port=True):
if hops == 0:
proxy_settings = DownloadManager.get_libtorrent_proxy_settings(self.config)
else:
proxy_settings = [SOCKS5_PROXY_DEF, ("127.0.0.1", self.socks_listen_ports[hops-1]), None]
proxy_settings = [SOCKS5_PROXY_DEF, ("127.0.0.1", self.socks_listen_ports[hops - 1]), None]
self.set_proxy_settings(ltsession, *proxy_settings)

for extension in extensions:
Expand All @@ -276,7 +279,7 @@ def create_session(self, hops=0, store_listen_port=True):
except Exception as exc:
self._logger.info(f"could not load libtorrent state, got exception: {exc!r}. starting from scratch")
else:
#ltsession.listen_on(anon_port, anon_port + 20)
# ltsession.listen_on(anon_port, anon_port + 20)

rate = DownloadManager.get_libtorrent_max_upload_rate(self.config)
download_rate = DownloadManager.get_libtorrent_max_download_rate(self.config)
Expand Down Expand Up @@ -369,8 +372,8 @@ def process_alert(self, alert, hops=0):
download = self.downloads.get(infohash)
if download:
is_process_alert = (download.handle and download.handle.is_valid()) \
or (not download.handle and alert_type == 'add_torrent_alert') \
or (download.handle and alert_type == 'torrent_removed_alert')
or (not download.handle and alert_type == 'add_torrent_alert') \
or (download.handle and alert_type == 'torrent_removed_alert')
if is_process_alert:
download.process_alert(alert, alert_type)
else:
Expand All @@ -385,7 +388,7 @@ def process_alert(self, alert, hops=0):
self.listen_ports[hops] = getattr(alert, "port", alert.endpoint[1])

elif alert_type == 'peer_disconnected_alert' and self.notifier:
self.notifier.notify(NTFY.PEER_DISCONNECTED_EVENT, alert.pid.to_bytes())
self.notifier.notify(NTFY.PEER_DISCONNECTED_EVENT.value, alert.pid.to_bytes())

elif alert_type == 'session_stats_alert':
queued_disk_jobs = alert.values['disk.queued_disk_jobs']
Expand Down Expand Up @@ -803,7 +806,7 @@ async def sesscb_states_callback(self, states_list):
if self.state_cb_count % 5 == 0 and download.config.get_hops() == 0 and self.notifier:
for peer in download.get_peerlist():
if str(peer["extended_version"]).startswith('Tribler'):
self.notifier.notify(NTFY.TRIBLER_TORRENT_PEER_UPDATE,
self.notifier.notify(NTFY.TRIBLER_TORRENT_PEER_UPDATE.value,
unhexlify(peer["id"]), infohash, peer["dtotal"])

if self.state_cb_count % 4 == 0:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def get_metainfo(infohash, timeout=20, hops=None, url=None):
await do_request(rest_api, f'torrentinfo?uri={path}', expected_code=500)

# Ensure that correct torrent metadata was sent through notifier (to MetadataStore)
mock_dlmgr.notifier.notify.assert_called_with(NTFY.TORRENT_METADATA_ADDED, metainfo_dict)
mock_dlmgr.notifier.notify.assert_called_with(NTFY.TORRENT_METADATA_ADDED.value, metainfo_dict)

mock_dlmgr.get_metainfo = get_metainfo
verify_valid_dict(await do_request(rest_api, f'torrentinfo?uri={path}', expected_code=200))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ async def get_torrent_info(self, request):

# Add the torrent to GigaChannel as a free-for-all entry, so others can search it
self.download_manager.notifier.notify(
NTFY.TORRENT_METADATA_ADDED,
NTFY.TORRENT_METADATA_ADDED.value,
tdef_to_metadata_dict(TorrentDef.load_from_dict(metainfo)))

# TODO(Martijn): store the stuff in a database!!!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
import time
from collections import deque

from ipv8.taskmanager import TaskManager

import psutil

from ipv8.taskmanager import TaskManager
from tribler_common.simpledefs import NTFY

from tribler_core.components.resource_monitor.implementation.base import ResourceMonitor
from tribler_core.components.resource_monitor.implementation.profiler import YappiProfiler
from tribler_core.components.resource_monitor.settings import ResourceMonitorSettings
Expand Down Expand Up @@ -101,7 +103,7 @@ def record_disk_usage(self, recorded_at=None):
if disk_usage.free < FREE_DISK_THRESHOLD:
self._logger.warning("Warning! Less than 100MB of disk space available")
if self.notifier:
self.notifier.notify(NTFY.LOW_SPACE, self.disk_usage_data[-1])
self.notifier.notify(NTFY.LOW_SPACE.value, self.disk_usage_data[-1])

def get_free_disk_space(self):
return psutil.disk_usage(str(self.state_dir))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import pytest

from tribler_common.simpledefs import NTFY
from tribler_core.components.resource_monitor.implementation.core import CoreResourceMonitor

from tribler_core.components.resource_monitor.implementation.core import CoreResourceMonitor
from tribler_core.components.resource_monitor.settings import ResourceMonitorSettings


Expand Down Expand Up @@ -86,7 +86,7 @@ def fake_get_free_disk_space():
return namedtuple('sdiskusage', disk.keys())(*disk.values())

def on_notify(subject, *args):
assert subject in [NTFY.LOW_SPACE, NTFY.TRIBLER_SHUTDOWN_STATE]
assert subject in [NTFY.LOW_SPACE.value, NTFY.TRIBLER_SHUTDOWN_STATE.value]

resource_monitor.get_free_disk_space = fake_get_free_disk_space
resource_monitor.notifier.notify = on_notify
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ def on_torrent_health_check_completed(self, infohash, result):
final_response = {}
if not result or not isinstance(result, list):
self._logger.info("Received invalid torrent checker result")
self.notifier.notify(NTFY.CHANNEL_ENTITY_UPDATED,
self.notifier.notify(NTFY.CHANNEL_ENTITY_UPDATED.value,
{"infohash": hexlify(infohash),
"num_seeders": 0,
"num_leechers": 0,
Expand Down Expand Up @@ -329,7 +329,7 @@ def on_torrent_health_check_completed(self, infohash, result):
self.update_torrents_checked(torrent_update_dict)

# TODO: DRY! Stop doing lots of formats, just make REST endpoint automatically encode binary data to hex!
self.notifier.notify(NTFY.CHANNEL_ENTITY_UPDATED,
self.notifier.notify(NTFY.CHANNEL_ENTITY_UPDATED.value,
{"infohash": hexlify(infohash),
"num_seeders": torrent_update_dict["seeders"],
"num_leechers": torrent_update_dict["leechers"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@
import platform
from distutils.version import LooseVersion

from aiohttp import (
ClientSession,
ClientTimeout,
)
from aiohttp import ClientSession, ClientTimeout

from ipv8.taskmanager import TaskManager

from tribler_common.simpledefs import NTFY

from tribler_core.notifier import Notifier
from tribler_core.version import version_id

Expand Down Expand Up @@ -64,7 +63,7 @@ async def check_new_version_api(self, version_check_url):
response_dict = await response.json(content_type=None)
version = response_dict['name'][1:]
if LooseVersion(version) > LooseVersion(version_id):
self.notifier.notify(NTFY.TRIBLER_NEW_VERSION, version)
self.notifier.notify(NTFY.TRIBLER_NEW_VERSION.value, version)
return True
return False

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
from pathlib import Path

from ipv8.taskmanager import TaskManager

from tribler_common.simpledefs import NTFY

from tribler_core.components.libtorrent.download_manager.download_manager import DownloadManager
from tribler_core.components.libtorrent.torrentdef import TorrentDef
from tribler_core.notifier import Notifier
Expand Down Expand Up @@ -41,7 +43,7 @@ def cleanup_torrent_file(self, root, name):
self._logger.warning(f'Cant rename the file to {path}. Exception: {e}')

self._logger.warning("Watch folder - corrupt torrent file %s", name)
self.notifier.notify(NTFY.WATCH_FOLDER_CORRUPT_FILE, name)
self.notifier.notify(NTFY.WATCH_FOLDER_CORRUPT_FILE.value, name)

def check_watch_folder(self):
if not self.watch_folder.is_dir():
Expand Down
85 changes: 44 additions & 41 deletions src/tribler-core/tribler_core/notifier.py
Original file line number Diff line number Diff line change
@@ -1,53 +1,56 @@
"""
Notifier.

Author(s): Vadim Bulavintsev
"""
import logging
from asyncio import get_event_loop

from tribler_common.simpledefs import NTFY
from collections import defaultdict
from typing import Callable, Dict


class Notifier:

def __init__(self):
self._logger = logging.getLogger(self.__class__.__name__)
self.observers = {}
self.logger = logging.getLogger(self.__class__.__name__)

# we use type Dict for `self.observers` for providing the deterministic order of callbacks
# Therefore `value: bool` here is unnecessary, and it just newer use.
self.observers: Dict[str, Dict[Callable, bool]] = defaultdict(dict)

# @ichorid:
# We have to note the event loop reference, because when we call "notify" from an external thread,
# we don't know anything about the existence of the event loop, and get_event_loop() can't find
# the original event loop from an external thread.
self._loop = get_event_loop()
# We remember the event loop from the thread that runs the Notifier
# to be able to schedule notifications from external threads
self._loop = get_event_loop()

def add_observer(self, subject, callback):
assert isinstance(subject, NTFY)
self.observers[subject] = self.observers.get(subject, [])
self.observers[subject].append(callback)
self._logger.debug(f"Add observer topic {subject} callback {callback}")

def remove_observer(self, subject, callback):
if subject not in self.observers:
return
if callback not in self.observers[subject]:
return

self.observers[subject].remove(callback)
self._logger.debug(f"Remove observer topic {subject} callback {callback}")

def notify(self, subject, *args):
# We have to call the notifier callbacks through call_soon_threadsafe
# because the notify method could have been called from a non-reactor thread
self._loop.call_soon_threadsafe(self._notify, subject, *args)

def _notify(self, subject, *args):
if subject not in self.observers:
self._logger.warning(f"Called notification on a non-existing subject {subject}")
return
for callback in self.observers[subject]:
callback(*args)

def notify_shutdown_state(self, state):
self._logger.info("Tribler shutdown state notification:%s", state)
self.notify(NTFY.TRIBLER_SHUTDOWN_STATE, state)
def add_observer(self, topic: str, callback: Callable):
""" Add the observer for the topic.
Order of the added callbacks will be the same order for the calling the callbacks.
`add_observer` doesn't support duplicated callbacks.
"""
self.logger.debug(f"Add observer topic {topic}")
self.observers[topic][callback] = True

def remove_observer(self, topic: str, callback: Callable):
""" Remove the observer from the topic. In the case of a missed callback no error will be raised.
"""
self.logger.debug(f"Remove observer topic {topic}")
self.observers[topic].pop(callback, None)

def notify(self, topic: str, *args, **kwargs):
""" Notify all observers about the topic.

Each call of observer's callback is isolated and an exception that could
occur in this call will not affect all other calls.
"""
try:
def _notify(_callback):
_callback(*args, **kwargs)

for callback in list(self.observers[topic]):
# @ichorid:
# We have to call the notifier callbacks through call_soon_threadsafe
# because the notify method could have been called from a non-reactor thread
self._loop.call_soon_threadsafe(_notify, callback)
except RuntimeError as e:
# Raises RuntimeError if called on a loop that’s been closed.
# This can happen on a secondary thread when the main application is shutting down.
# https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.call_soon_threadsafe
self.logger.warning(e)
Loading