Skip to content

Commit

Permalink
Make notifier more generic
Browse files Browse the repository at this point in the history
  • Loading branch information
drew2a committed Jan 4, 2022
1 parent 1ff3697 commit 69c5dff
Show file tree
Hide file tree
Showing 14 changed files with 166 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
from tribler_common.simpledefs import CHANNELS_VIEW_UUID, NTFY

from tribler_core.components.gigachannel.community.discovery_booster import DiscoveryBooster
from tribler_core.components.metadata_store.remote_query_community.payload_checker import ObjState
from tribler_core.components.metadata_store.db.serialization import CHANNEL_TORRENT
from tribler_core.components.metadata_store.utils import NoChannelSourcesException
from tribler_core.components.metadata_store.remote_query_community.payload_checker import ObjState
from tribler_core.components.metadata_store.remote_query_community.remote_query_community import RemoteQueryCommunity
from tribler_core.components.metadata_store.utils import NoChannelSourcesException
from tribler_core.utilities.unicode import hexlify

minimal_blob_size = 200
Expand Down Expand Up @@ -152,7 +152,8 @@ def on_packet_callback(_, processing_results):
)
]
if self.notifier and results:
self.notifier.notify(NTFY.CHANNEL_DISCOVERED, {"results": results, "uuid": str(CHANNELS_VIEW_UUID)})
self.notifier.notify(NTFY.CHANNEL_DISCOVERED.value,
{"results": results, "uuid": str(CHANNELS_VIEW_UUID)})

request_dict = {
"metadata_type": [CHANNEL_TORRENT],
Expand Down Expand Up @@ -211,7 +212,7 @@ def notify_gui(request, processing_results):
]
if self.notifier:
self.notifier.notify(
NTFY.REMOTE_QUERY_RESULTS,
NTFY.REMOTE_QUERY_RESULTS.value,
{"results": results, "uuid": str(request_uuid), "peer": hexlify(request.peer.mid)},
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ def _process_download():
updated_channel = self.mds.ChannelMetadata.get(public_key=channel.public_key, id_=channel.id_)
channel_dict = updated_channel.to_simple_dict() if updated_channel else None
if updated_channel:
self.notifier.notify(NTFY.CHANNEL_ENTITY_UPDATED, channel_dict)
self.notifier.notify(NTFY.CHANNEL_ENTITY_UPDATED.value, channel_dict)

def updated_my_channel(self, tdef):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@
from tribler_common.osutils import fix_filebasename
from tribler_common.simpledefs import DLSTATUS_SEEDING, DLSTATUS_STOPPED, DOWNLOAD, NTFY

from tribler_core.exceptions import SaveResumeDataError
from tribler_core.components.libtorrent.download_manager.download_config import DownloadConfig
from tribler_core.components.libtorrent.download_manager.download_state import DownloadState
from tribler_core.components.libtorrent.settings import DownloadDefaultsSettings
from tribler_core.components.libtorrent.download_manager.stream import Stream
from tribler_core.components.libtorrent.settings import DownloadDefaultsSettings
from tribler_core.components.libtorrent.torrentdef import TorrentDef, TorrentDefNoMetainfo
from tribler_core.notifier import Notifier
from tribler_core.components.libtorrent.utils.libtorrent_helper import libtorrent as lt
from tribler_core.utilities.path_util import Path
from tribler_core.components.libtorrent.utils.torrent_utils import check_handle, get_info_from_handle, require_handle
from tribler_core.exceptions import SaveResumeDataError
from tribler_core.notifier import Notifier
from tribler_core.utilities.path_util import Path
from tribler_core.utilities.unicode import ensure_unicode, hexlify
from tribler_core.utilities.utilities import bdecode_compat

Expand Down Expand Up @@ -391,7 +391,7 @@ def on_torrent_finished_alert(self, _):
self.checkpoint()
if self.get_state().get_total_transferred(DOWNLOAD) > 0 and self.stream is not None:
if self.notifier is not None:
self.notifier.notify(NTFY.TORRENT_FINISHED, self.tdef.get_infohash(),
self.notifier.notify(NTFY.TORRENT_FINISHED.value, self.tdef.get_infohash(),
self.tdef.get_name_as_unicode(), self.hidden or
self.config.get_channel_download())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ def __init__(self,
self.metainfo_cache = {} # Dictionary that maps infohashes to cached metainfo items

self.default_alert_mask = lt.alert.category_t.error_notification | lt.alert.category_t.status_notification | \
lt.alert.category_t.storage_notification | lt.alert.category_t.performance_warning | \
lt.alert.category_t.tracker_notification | lt.alert.category_t.debug_notification
lt.alert.category_t.storage_notification | lt.alert.category_t.performance_warning | \
lt.alert.category_t.tracker_notification | lt.alert.category_t.debug_notification
self.session_stats_callback = None
self.state_cb_count = 0

Expand Down Expand Up @@ -151,19 +151,22 @@ def initialize(self):

self.set_download_states_callback(self.sesscb_states_callback)

def notify_shutdown_state(self, state):
self.notifier.notify(NTFY.TRIBLER_SHUTDOWN_STATE.value, state)

async def shutdown(self, timeout=30):
if self.downloads:
self.notifier.notify_shutdown_state("Checkpointing Downloads...")
self.notify_shutdown_state("Checkpointing Downloads...")
await gather(*[download.stop() for download in self.downloads.values()], return_exceptions=True)
self.notifier.notify_shutdown_state("Shutting down Downloads...")
self.notify_shutdown_state("Shutting down Downloads...")
await gather(*[download.shutdown() for download in self.downloads.values()], return_exceptions=True)

self.notifier.notify_shutdown_state("Shutting down Libtorrent Manager...")
self.notify_shutdown_state("Shutting down Libtorrent Manager...")
# If libtorrent session has pending disk io, wait until timeout (default: 30 seconds) to let it finish.
# In between ask for session stats to check if state is clean for shutdown.
# In dummy mode, we immediately shut down the download manager.
while not self.dummy_mode and not self.is_shutdown_ready() and timeout >= 1:
self.notifier.notify_shutdown_state("Waiting for Libtorrent to finish...")
self.notify_shutdown_state("Waiting for Libtorrent to finish...")
self.post_session_stats()
timeout -= 1
await asyncio.sleep(1)
Expand Down Expand Up @@ -244,7 +247,7 @@ def create_session(self, hops=0, store_listen_port=True):
settings['force_proxy'] = True

# Anon listen port is never used anywhere, so we let Libtorrent set it
#settings["listen_interfaces"] = "0.0.0.0:%d" % anon_port
# settings["listen_interfaces"] = "0.0.0.0:%d" % anon_port

# By default block all IPs except 1.1.1.1 (which is used to ensure libtorrent makes a connection to us)
self.update_ip_filter(ltsession, ['1.1.1.1'])
Expand All @@ -255,7 +258,7 @@ def create_session(self, hops=0, store_listen_port=True):
if hops == 0:
proxy_settings = DownloadManager.get_libtorrent_proxy_settings(self.config)
else:
proxy_settings = [SOCKS5_PROXY_DEF, ("127.0.0.1", self.socks_listen_ports[hops-1]), None]
proxy_settings = [SOCKS5_PROXY_DEF, ("127.0.0.1", self.socks_listen_ports[hops - 1]), None]
self.set_proxy_settings(ltsession, *proxy_settings)

for extension in extensions:
Expand All @@ -276,7 +279,7 @@ def create_session(self, hops=0, store_listen_port=True):
except Exception as exc:
self._logger.info(f"could not load libtorrent state, got exception: {exc!r}. starting from scratch")
else:
#ltsession.listen_on(anon_port, anon_port + 20)
# ltsession.listen_on(anon_port, anon_port + 20)

rate = DownloadManager.get_libtorrent_max_upload_rate(self.config)
download_rate = DownloadManager.get_libtorrent_max_download_rate(self.config)
Expand Down Expand Up @@ -369,8 +372,8 @@ def process_alert(self, alert, hops=0):
download = self.downloads.get(infohash)
if download:
is_process_alert = (download.handle and download.handle.is_valid()) \
or (not download.handle and alert_type == 'add_torrent_alert') \
or (download.handle and alert_type == 'torrent_removed_alert')
or (not download.handle and alert_type == 'add_torrent_alert') \
or (download.handle and alert_type == 'torrent_removed_alert')
if is_process_alert:
download.process_alert(alert, alert_type)
else:
Expand All @@ -385,7 +388,7 @@ def process_alert(self, alert, hops=0):
self.listen_ports[hops] = getattr(alert, "port", alert.endpoint[1])

elif alert_type == 'peer_disconnected_alert' and self.notifier:
self.notifier.notify(NTFY.PEER_DISCONNECTED_EVENT, alert.pid.to_bytes())
self.notifier.notify(NTFY.PEER_DISCONNECTED_EVENT.value, alert.pid.to_bytes())

elif alert_type == 'session_stats_alert':
queued_disk_jobs = alert.values['disk.queued_disk_jobs']
Expand Down Expand Up @@ -803,7 +806,7 @@ async def sesscb_states_callback(self, states_list):
if self.state_cb_count % 5 == 0 and download.config.get_hops() == 0 and self.notifier:
for peer in download.get_peerlist():
if str(peer["extended_version"]).startswith('Tribler'):
self.notifier.notify(NTFY.TRIBLER_TORRENT_PEER_UPDATE,
self.notifier.notify(NTFY.TRIBLER_TORRENT_PEER_UPDATE.value,
unhexlify(peer["id"]), infohash, peer["dtotal"])

if self.state_cb_count % 4 == 0:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def get_metainfo(infohash, timeout=20, hops=None, url=None):
await do_request(rest_api, f'torrentinfo?uri={path}', expected_code=500)

# Ensure that correct torrent metadata was sent through notifier (to MetadataStore)
mock_dlmgr.notifier.notify.assert_called_with(NTFY.TORRENT_METADATA_ADDED, metainfo_dict)
mock_dlmgr.notifier.notify.assert_called_with(NTFY.TORRENT_METADATA_ADDED.value, metainfo_dict)

mock_dlmgr.get_metainfo = get_metainfo
verify_valid_dict(await do_request(rest_api, f'torrentinfo?uri={path}', expected_code=200))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ async def get_torrent_info(self, request):

# Add the torrent to GigaChannel as a free-for-all entry, so others can search it
self.download_manager.notifier.notify(
NTFY.TORRENT_METADATA_ADDED,
NTFY.TORRENT_METADATA_ADDED.value,
tdef_to_metadata_dict(TorrentDef.load_from_dict(metainfo)))

# TODO(Martijn): store the stuff in a database!!!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
import time
from collections import deque

from ipv8.taskmanager import TaskManager

import psutil

from ipv8.taskmanager import TaskManager
from tribler_common.simpledefs import NTFY

from tribler_core.components.resource_monitor.implementation.base import ResourceMonitor
from tribler_core.components.resource_monitor.implementation.profiler import YappiProfiler
from tribler_core.components.resource_monitor.settings import ResourceMonitorSettings
Expand Down Expand Up @@ -101,7 +103,7 @@ def record_disk_usage(self, recorded_at=None):
if disk_usage.free < FREE_DISK_THRESHOLD:
self._logger.warning("Warning! Less than 100MB of disk space available")
if self.notifier:
self.notifier.notify(NTFY.LOW_SPACE, self.disk_usage_data[-1])
self.notifier.notify(NTFY.LOW_SPACE.value, self.disk_usage_data[-1])

def get_free_disk_space(self):
return psutil.disk_usage(str(self.state_dir))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import pytest

from tribler_common.simpledefs import NTFY
from tribler_core.components.resource_monitor.implementation.core import CoreResourceMonitor

from tribler_core.components.resource_monitor.implementation.core import CoreResourceMonitor
from tribler_core.components.resource_monitor.settings import ResourceMonitorSettings


Expand Down Expand Up @@ -86,7 +86,7 @@ def fake_get_free_disk_space():
return namedtuple('sdiskusage', disk.keys())(*disk.values())

def on_notify(subject, *args):
assert subject in [NTFY.LOW_SPACE, NTFY.TRIBLER_SHUTDOWN_STATE]
assert subject in [NTFY.LOW_SPACE.value, NTFY.TRIBLER_SHUTDOWN_STATE.value]

resource_monitor.get_free_disk_space = fake_get_free_disk_space
resource_monitor.notifier.notify = on_notify
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ def on_torrent_health_check_completed(self, infohash, result):
final_response = {}
if not result or not isinstance(result, list):
self._logger.info("Received invalid torrent checker result")
self.notifier.notify(NTFY.CHANNEL_ENTITY_UPDATED,
self.notifier.notify(NTFY.CHANNEL_ENTITY_UPDATED.value,
{"infohash": hexlify(infohash),
"num_seeders": 0,
"num_leechers": 0,
Expand Down Expand Up @@ -329,7 +329,7 @@ def on_torrent_health_check_completed(self, infohash, result):
self.update_torrents_checked(torrent_update_dict)

# TODO: DRY! Stop doing lots of formats, just make REST endpoint automatically encode binary data to hex!
self.notifier.notify(NTFY.CHANNEL_ENTITY_UPDATED,
self.notifier.notify(NTFY.CHANNEL_ENTITY_UPDATED.value,
{"infohash": hexlify(infohash),
"num_seeders": torrent_update_dict["seeders"],
"num_leechers": torrent_update_dict["leechers"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@
import platform
from distutils.version import LooseVersion

from aiohttp import (
ClientSession,
ClientTimeout,
)
from aiohttp import ClientSession, ClientTimeout

from ipv8.taskmanager import TaskManager

from tribler_common.simpledefs import NTFY

from tribler_core.notifier import Notifier
from tribler_core.version import version_id

Expand Down Expand Up @@ -64,7 +63,7 @@ async def check_new_version_api(self, version_check_url):
response_dict = await response.json(content_type=None)
version = response_dict['name'][1:]
if LooseVersion(version) > LooseVersion(version_id):
self.notifier.notify(NTFY.TRIBLER_NEW_VERSION, version)
self.notifier.notify(NTFY.TRIBLER_NEW_VERSION.value, version)
return True
return False

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
from pathlib import Path

from ipv8.taskmanager import TaskManager

from tribler_common.simpledefs import NTFY

from tribler_core.components.libtorrent.download_manager.download_manager import DownloadManager
from tribler_core.components.libtorrent.torrentdef import TorrentDef
from tribler_core.notifier import Notifier
Expand Down Expand Up @@ -41,7 +43,7 @@ def cleanup_torrent_file(self, root, name):
self._logger.warning(f'Cant rename the file to {path}. Exception: {e}')

self._logger.warning("Watch folder - corrupt torrent file %s", name)
self.notifier.notify(NTFY.WATCH_FOLDER_CORRUPT_FILE, name)
self.notifier.notify(NTFY.WATCH_FOLDER_CORRUPT_FILE.value, name)

def check_watch_folder(self):
if not self.watch_folder.is_dir():
Expand Down
73 changes: 32 additions & 41 deletions src/tribler-core/tribler_core/notifier.py
Original file line number Diff line number Diff line change
@@ -1,53 +1,44 @@
"""
Notifier.
Author(s): Vadim Bulavintsev
"""
import logging
from asyncio import get_event_loop

from tribler_common.simpledefs import NTFY
from collections import defaultdict
from typing import Callable, Dict


class Notifier:

def __init__(self):
self._logger = logging.getLogger(self.__class__.__name__)
self.observers = {}
self.logger = logging.getLogger(self.__class__.__name__)
self.observers: Dict[str, set] = defaultdict(set)
# @ichorid:
# We have to note the event loop reference, because when we call "notify" from an external thread,
# we don't know anything about the existence of the event loop, and get_event_loop() can't find
# the original event loop from an external thread.
self._loop = get_event_loop()
# We remember the event loop from the thread that runs the Notifier
# to be able to schedule notifications from external threads
self._loop = get_event_loop()

def add_observer(self, subject, callback):
assert isinstance(subject, NTFY)
self.observers[subject] = self.observers.get(subject, [])
self.observers[subject].append(callback)
self._logger.debug(f"Add observer topic {subject} callback {callback}")

def remove_observer(self, subject, callback):
if subject not in self.observers:
return
if callback not in self.observers[subject]:
return

self.observers[subject].remove(callback)
self._logger.debug(f"Remove observer topic {subject} callback {callback}")

def notify(self, subject, *args):
# We have to call the notifier callbacks through call_soon_threadsafe
# because the notify method could have been called from a non-reactor thread
self._loop.call_soon_threadsafe(self._notify, subject, *args)

def _notify(self, subject, *args):
if subject not in self.observers:
self._logger.warning(f"Called notification on a non-existing subject {subject}")
return
for callback in self.observers[subject]:
callback(*args)

def notify_shutdown_state(self, state):
self._logger.info("Tribler shutdown state notification:%s", state)
self.notify(NTFY.TRIBLER_SHUTDOWN_STATE, state)
def add_observer(self, topic: str, callback: Callable):
self.logger.debug(f"Add observer topic {topic}")
self.observers[topic].add(callback)

def remove_observer(self, topic: str, callback: Callable):
self.logger.debug(f"Remove observer topic {topic}")
self.observers[topic].discard(callback)

def notify(self, topic: str, *args, **kwargs):
def _notify(_topic, _kwargs, *_args):
for callback in self.observers[_topic]:
try:
callback(*_args, **_kwargs)
except Exception as _e: # pylint: disable=broad-except
self.logger.exception(_e)

try:
# @ichorid:
# We have to call the notifier callbacks through call_soon_threadsafe
# because the notify method could have been called from a non-reactor thread
self._loop.call_soon_threadsafe(_notify, topic, kwargs, *args)
except RuntimeError as e:
# Raises RuntimeError if called on a loop that’s been closed.
# This can happen on a secondary thread when the main application is shutting down.
# https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.call_soon_threadsafe
self.logger.warning(e)
4 changes: 2 additions & 2 deletions src/tribler-core/tribler_core/start_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,14 @@ async def core_session(config: TriblerConfig, components: List[Component]):

# If there is a config error, report to the user via GUI notifier
if config.error:
session.notifier.notify(NTFY.REPORT_CONFIG_ERROR, config.error)
session.notifier.notify(NTFY.REPORT_CONFIG_ERROR.value, config.error)

# SHUTDOWN
await session.shutdown_event.wait()
await session.shutdown()

if not config.gui_test_mode:
session.notifier.notify_shutdown_state("Saving configuration...")
session.notifier.notify(NTFY.TRIBLER_SHUTDOWN_STATE.value, "Saving configuration...")
config.write()


Expand Down
Loading

0 comments on commit 69c5dff

Please sign in to comment.