From 107e0265447167409f8166b3d35e3ccf077cd32a Mon Sep 17 00:00:00 2001 From: Alexander Kozlovsky Date: Tue, 11 Jan 2022 14:54:25 +0100 Subject: [PATCH 1/2] Statically typed notifier --- src/tribler-core/run_tunnel_helper.py | 7 +- .../tribler_core/components/base.py | 2 +- .../community/gigachannel_community.py | 15 +- .../tests/test_gigachannel_community.py | 39 +- .../gigachannel_manager.py | 5 +- .../tests/test_gigachannel_manager.py | 24 +- .../libtorrent/download_manager/download.py | 13 +- .../download_manager/download_manager.py | 13 +- .../tests/test_torrentinfo_endpoint.py | 14 +- .../restapi/torrentinfo_endpoint.py | 5 +- .../libtorrent/tests/test_download_manager.py | 76 ++-- .../db/orm_bindings/torrent_metadata.py | 16 +- .../components/metadata_store/db/store.py | 7 +- .../metadata_store_component.py | 5 +- .../restapi/tests/test_metadata_endpoint.py | 6 +- .../tests/test_channel_download.py | 6 +- .../components/payout/payout_component.py | 12 +- .../components/payout/payout_manager.py | 26 +- .../resource_monitor/implementation/core.py | 6 +- .../tests/test_resource_monitor.py | 18 +- .../restapi/rest/events_endpoint.py | 87 ++-- .../restapi/rest/tests/test_debug_endpoint.py | 6 +- .../rest/tests/test_events_endpoint.py | 51 ++- .../tag/rules/tag_rules_processor.py | 12 +- .../rules/tests/test_tag_rules_processor.py | 17 +- .../tests/test_torrentchecker.py | 8 +- .../torrent_checker/torrent_checker.py | 26 +- .../tunnel/community/tunnel_community.py | 8 +- .../test_tunnel_community.py | 6 +- .../version_check/tests/test_versioncheck.py | 8 +- .../version_check/versioncheck_manager.py | 6 +- .../watch_folder/tests/test_watch_folder.py | 4 +- .../components/watch_folder/watch_folder.py | 6 +- src/tribler-core/tribler_core/conftest.py | 10 +- .../tribler_core/notifications.py | 89 ++++ src/tribler-core/tribler_core/notifier.py | 56 --- src/tribler-core/tribler_core/start_core.py | 6 +- .../tribler_core/tests/test_notifier.py | 111 ----- .../upgrade/tests/test_upgrader.py | 6 - .../tribler_core/utilities/notifier.py | 252 ++++++++++++ .../utilities/tests/test_notifier.py | 381 ++++++++++++++++++ .../tribler_gui/event_request_manager.py | 73 ++-- src/tribler-gui/tribler_gui/tribler_window.py | 2 +- 43 files changed, 1048 insertions(+), 498 deletions(-) create mode 100644 src/tribler-core/tribler_core/notifications.py delete mode 100644 src/tribler-core/tribler_core/notifier.py delete mode 100644 src/tribler-core/tribler_core/tests/test_notifier.py create mode 100644 src/tribler-core/tribler_core/utilities/notifier.py create mode 100644 src/tribler-core/tribler_core/utilities/tests/test_notifier.py diff --git a/src/tribler-core/run_tunnel_helper.py b/src/tribler-core/run_tunnel_helper.py index 23b4afd86e7..044d0201622 100644 --- a/src/tribler-core/run_tunnel_helper.py +++ b/src/tribler-core/run_tunnel_helper.py @@ -11,8 +11,10 @@ from asyncio import ensure_future, get_event_loop from ipaddress import AddressValueError, IPv4Address +from ipv8.messaging.anonymization.tunnel import Circuit from ipv8.taskmanager import TaskManager +from tribler_core import notifications from tribler_core.components.bandwidth_accounting.bandwidth_accounting_component import BandwidthAccountingComponent from tribler_core.components.base import Session from tribler_core.components.ipv8.ipv8_component import Ipv8Component @@ -24,7 +26,6 @@ from tribler_core.config.tribler_config import TriblerConfig from tribler_core.utilities.osutils import get_root_state_directory from tribler_core.utilities.path_util import Path -from tribler_core.utilities.simpledefs import NTFY logger = logging.getLogger(__name__) @@ -130,7 +131,7 @@ async def signal_handler(sig): new_strategies.append((strategy, target_peers)) ipv8.strategies = new_strategies - def circuit_removed(self, circuit, additional_info): + def circuit_removed(self, circuit: Circuit, additional_info: str): ipv8 = Ipv8Component.instance().ipv8 ipv8.network.remove_by_address(circuit.peer.address) if self.log_circuits: @@ -146,7 +147,7 @@ async def start(self, options): session.set_as_default() self.log_circuits = options.log_circuits - session.notifier.add_observer(NTFY.TUNNEL_REMOVE.value, self.circuit_removed) + session.notifier.add_observer(notifications.circuit_removed, self.circuit_removed) await session.start_components() diff --git a/src/tribler-core/tribler_core/components/base.py b/src/tribler-core/tribler_core/components/base.py index 500bf578bb4..3b1bcac44b1 100644 --- a/src/tribler-core/tribler_core/components/base.py +++ b/src/tribler-core/tribler_core/components/base.py @@ -10,10 +10,10 @@ from typing import Dict, List, Optional, Set, Type, TypeVar, Union from tribler_core.config.tribler_config import TriblerConfig -from tribler_core.notifier import Notifier from tribler_core.utilities.crypto_patcher import patch_crypto_be_discovery from tribler_core.utilities.install_dir import get_lib_path from tribler_core.utilities.network_utils import default_network_utils +from tribler_core.utilities.notifier import Notifier from tribler_core.utilities.simpledefs import STATEDIR_CHANNELS_DIR, STATEDIR_DB_DIR diff --git a/src/tribler-core/tribler_core/components/gigachannel/community/gigachannel_community.py b/src/tribler-core/tribler_core/components/gigachannel/community/gigachannel_community.py index 3bda0ea2ba2..fd3f523a2ac 100644 --- a/src/tribler-core/tribler_core/components/gigachannel/community/gigachannel_community.py +++ b/src/tribler-core/tribler_core/components/gigachannel/community/gigachannel_community.py @@ -10,12 +10,14 @@ from pony.orm import db_session +from tribler_core import notifications from tribler_core.components.ipv8.discovery_booster import DiscoveryBooster from tribler_core.components.metadata_store.db.serialization import CHANNEL_TORRENT from tribler_core.components.metadata_store.remote_query_community.payload_checker import ObjState from tribler_core.components.metadata_store.remote_query_community.remote_query_community import RemoteQueryCommunity from tribler_core.components.metadata_store.utils import NoChannelSourcesException -from tribler_core.utilities.simpledefs import CHANNELS_VIEW_UUID, NTFY +from tribler_core.utilities.notifier import Notifier +from tribler_core.utilities.simpledefs import CHANNELS_VIEW_UUID from tribler_core.utilities.unicode import hexlify minimal_blob_size = 200 @@ -92,7 +94,7 @@ def create_introduction_response( ) def __init__( - self, *args, notifier=None, **kwargs + self, *args, notifier: Notifier = None, **kwargs ): # pylint: disable=unused-argument # ACHTUNG! We create a separate instance of Network for this community because it # walks aggressively and wants lots of peers, which can interfere with other communities @@ -151,8 +153,7 @@ def on_packet_callback(_, processing_results): ) ] if self.notifier and results: - self.notifier.notify(NTFY.CHANNEL_DISCOVERED.value, - {"results": results, "uuid": str(CHANNELS_VIEW_UUID)}) + self.notifier[notifications.channel_discovered]({"results": results, "uuid": str(CHANNELS_VIEW_UUID)}) request_dict = { "metadata_type": [CHANNEL_TORRENT], @@ -210,10 +211,8 @@ def notify_gui(request, processing_results): if r.obj_state in (ObjState.NEW_OBJECT, ObjState.UPDATED_LOCAL_VERSION) ] if self.notifier: - self.notifier.notify( - NTFY.REMOTE_QUERY_RESULTS.value, - {"results": results, "uuid": str(request_uuid), "peer": hexlify(request.peer.mid)}, - ) + self.notifier[notifications.remote_query_results]( + {"results": results, "uuid": str(request_uuid), "peer": hexlify(request.peer.mid)}) # Try sending the request to at least some peers that we know have it if "channel_pk" in kwargs and "origin_id" in kwargs: diff --git a/src/tribler-core/tribler_core/components/gigachannel/community/tests/test_gigachannel_community.py b/src/tribler-core/tribler_core/components/gigachannel/community/tests/test_gigachannel_community.py index d3586093f78..e3edc981659 100644 --- a/src/tribler-core/tribler_core/components/gigachannel/community/tests/test_gigachannel_community.py +++ b/src/tribler-core/tribler_core/components/gigachannel/community/tests/test_gigachannel_community.py @@ -1,6 +1,5 @@ import time from datetime import datetime -from unittest import mock from unittest.mock import AsyncMock, Mock, PropertyMock, patch from ipv8.keyvault.crypto import default_eccrypto @@ -20,7 +19,7 @@ from tribler_core.components.metadata_store.db.store import MetadataStore from tribler_core.components.metadata_store.remote_query_community.settings import RemoteQueryCommunitySettings from tribler_core.components.metadata_store.utils import RequestTimeoutException -from tribler_core.notifier import Notifier +from tribler_core.utilities.notifier import Notifier from tribler_core.utilities.path_util import Path from tribler_core.utilities.utilities import random_infohash @@ -55,7 +54,7 @@ def create_node(self, *args, **kwargs): kwargs['metadata_store'] = metadata_store kwargs['settings'] = ChantSettings() kwargs['rqc_settings'] = RemoteQueryCommunitySettings() - with mock.patch('tribler_core.components.gigachannel.community.gigachannel_community.DiscoveryBooster'): + with patch('tribler_core.components.gigachannel.community.gigachannel_community.DiscoveryBooster'): node = super().create_node(*args, **kwargs) self.count += 1 return node @@ -117,27 +116,24 @@ async def test_gigachannel_search(self): self.nodes[1].overlay.mds.TorrentMetadata(title=U_TORRENT, infohash=random_infohash()) self.nodes[1].overlay.mds.TorrentMetadata(title="debian torrent", infohash=random_infohash()) - notification_calls = [] - - def mock_notify(_, args): - notification_calls.append(args) - - self.nodes[2].overlay.notifier = Notifier() - self.nodes[2].overlay.notifier.notify = lambda sub, args: mock_notify(self.nodes[2].overlay, args) + notifier = Notifier(loop=self.loop) + notifier.notify = Mock() + self.nodes[2].overlay.notifier = notifier self.nodes[2].overlay.send_search_request(**{"txt_filter": "ubuntu*"}) await self.deliver_messages(timeout=0.5) + # Check that the notifier callback was called on both entries + titles = sorted(call.args[1]["results"][0]["name"] for call in notifier.notify.call_args_list) + assert titles == [U_CHANNEL, U_TORRENT] + with db_session: assert self.nodes[2].overlay.mds.ChannelNode.select().count() == 2 assert ( self.nodes[2].overlay.mds.ChannelNode.select(lambda g: g.title in (U_CHANNEL, U_TORRENT)).count() == 2 ) - # Check that the notifier callback was called on both entries - assert [U_CHANNEL, U_TORRENT] == sorted([c["results"][0]["name"] for c in notification_calls]) - def test_query_on_introduction(self): """ Test querying a peer that was just introduced to us. @@ -198,18 +194,18 @@ async def test_remote_select_subscribed_channels(self): channel_uns = self.nodes[0].overlay.mds.ChannelMetadata.create_channel("channel unsub", "") channel_uns.subscribed = False - def mock_notify(overlay, args): - overlay.notified_results = True - self.assertTrue("results" in args) - - self.nodes[1].overlay.notifier = Notifier() - self.nodes[1].overlay.notifier.notify = lambda sub, args: mock_notify(self.nodes[1].overlay, args) + notifier = Notifier(loop=self.loop) + notifier.notify = Mock() + self.nodes[1].overlay.notifier = notifier peer = self.nodes[0].my_peer await self.introduce_nodes() - await self.deliver_messages(timeout=0.5) + # Check that the notifier callback is called on new channel entries + notifier.notify.assert_called() + assert "results" in notifier.notify.call_args.args[1] + with db_session: received_channels = self.nodes[1].overlay.mds.ChannelMetadata.select(lambda g: g.title == "channel sub") self.assertEqual(num_channels, received_channels.count()) @@ -225,9 +221,6 @@ def mock_notify(overlay, args): for chan in self.nodes[1].overlay.mds.ChannelMetadata.select(): self.assertTrue(chan.votes > 0.0) - # Check that the notifier callback is called on new channel entries - self.assertTrue(self.nodes[1].overlay.notified_results) - def test_channels_peers_mapping_drop_excess_peers(self): """ Test dropping old excess peers from a channel to peers mapping diff --git a/src/tribler-core/tribler_core/components/gigachannel_manager/gigachannel_manager.py b/src/tribler-core/tribler_core/components/gigachannel_manager/gigachannel_manager.py index 7af32ce037b..a1b69686d89 100644 --- a/src/tribler-core/tribler_core/components/gigachannel_manager/gigachannel_manager.py +++ b/src/tribler-core/tribler_core/components/gigachannel_manager/gigachannel_manager.py @@ -6,13 +6,14 @@ from pony.orm import db_session +from tribler_core import notifications from tribler_core.components.libtorrent.download_manager.download_config import DownloadConfig from tribler_core.components.libtorrent.download_manager.download_manager import DownloadManager from tribler_core.components.libtorrent.torrentdef import TorrentDef from tribler_core.components.metadata_store.db.orm_bindings.channel_node import COMMITTED from tribler_core.components.metadata_store.db.serialization import CHANNEL_TORRENT from tribler_core.components.metadata_store.db.store import MetadataStore -from tribler_core.notifier import Notifier +from tribler_core.utilities.notifier import Notifier from tribler_core.utilities.simpledefs import DLSTATUS_SEEDING, NTFY from tribler_core.utilities.unicode import hexlify @@ -290,7 +291,7 @@ def _process_download(): updated_channel = self.mds.ChannelMetadata.get(public_key=channel.public_key, id_=channel.id_) channel_dict = updated_channel.to_simple_dict() if updated_channel else None if updated_channel: - self.notifier.notify(NTFY.CHANNEL_ENTITY_UPDATED.value, channel_dict) + self.notifier[notifications.channel_entity_updated](channel_dict) def updated_my_channel(self, tdef): """ diff --git a/src/tribler-core/tribler_core/components/gigachannel_manager/tests/test_gigachannel_manager.py b/src/tribler-core/tribler_core/components/gigachannel_manager/tests/test_gigachannel_manager.py index 1392e25e368..c2c5d79ae80 100644 --- a/src/tribler-core/tribler_core/components/gigachannel_manager/tests/test_gigachannel_manager.py +++ b/src/tribler-core/tribler_core/components/gigachannel_manager/tests/test_gigachannel_manager.py @@ -2,7 +2,7 @@ from asyncio import Future from datetime import datetime from pathlib import Path -from unittest.mock import Mock, patch +from unittest.mock import MagicMock, patch from ipv8.util import succeed @@ -42,8 +42,8 @@ async def gigachannel_manager(metadata_store): chanman = GigaChannelManager( state_dir=metadata_store.channels_dir.parent, metadata_store=metadata_store, - download_manager=Mock(), - notifier=Mock(), + download_manager=MagicMock(), + notifier=MagicMock() ) yield chanman await chanman.shutdown() @@ -55,7 +55,7 @@ async def test_regen_personal_channel_no_torrent(personal_channel, gigachannel_m Test regenerating a non-existing personal channel torrent at startup """ gigachannel_manager.download_manager.get_download = lambda _: None - gigachannel_manager.regenerate_channel_torrent = Mock() + gigachannel_manager.regenerate_channel_torrent = MagicMock() await gigachannel_manager.check_and_regen_personal_channels() gigachannel_manager.regenerate_channel_torrent.assert_called_once() @@ -86,8 +86,8 @@ async def test_regenerate_channel_torrent(personal_channel, metadata_store, giga # Test trying to regenerate a non-existing channel assert await gigachannel_manager.regenerate_channel_torrent(chan_pk, chan_id + 1) is None - # Mock existing downloads removal-related functions - gigachannel_manager.download_manager.get_downloads_by_name = lambda *_: [Mock()] + # MagicMock existing downloads removal-related functions + gigachannel_manager.download_manager.get_downloads_by_name = lambda *_: [MagicMock()] downloads_to_remove = [] async def mock_remove_download(download_obj, **_): @@ -101,8 +101,8 @@ async def mock_remove_download(download_obj, **_): assert len(downloads_to_remove) == 1 # Test regenerating a non-empty channel - gigachannel_manager.updated_my_channel = Mock() - metadata_store.ChannelMetadata.consolidate_channel_torrent = lambda *_: Mock() + gigachannel_manager.updated_my_channel = MagicMock() + metadata_store.ChannelMetadata.consolidate_channel_torrent = lambda *_: MagicMock() with patch("tribler_core.components.libtorrent.torrentdef.TorrentDef.load_from_dict"): await gigachannel_manager.regenerate_channel_torrent(chan_pk, chan_id) gigachannel_manager.updated_my_channel.assert_called_once() @@ -110,7 +110,7 @@ async def mock_remove_download(download_obj, **_): def test_updated_my_channel(personal_channel, gigachannel_manager, tmpdir): tdef = TorrentDef.load_from_dict(update_metainfo) - gigachannel_manager.download_manager.start_download = Mock() + gigachannel_manager.download_manager.start_download = MagicMock() gigachannel_manager.download_manager.download_exists = lambda *_: False gigachannel_manager.updated_my_channel(tdef) gigachannel_manager.download_manager.start_download.assert_called_once() @@ -120,7 +120,7 @@ def test_updated_my_channel(personal_channel, gigachannel_manager, tmpdir): async def test_check_and_regen_personal_channel_torrent(personal_channel, gigachannel_manager): with db_session: chan_pk, chan_id = personal_channel.public_key, personal_channel.id_ - chan_download = Mock() + chan_download = MagicMock() async def mock_wait(*_): pass @@ -135,7 +135,7 @@ async def mock_wait_2(*_): chan_download.wait_for_status = mock_wait_2 # Test timeout waiting for seeding state and then regen - f = Mock() + f = MagicMock() async def mock_regen(*_): f() @@ -212,7 +212,7 @@ def fake_get_metainfo(infohash, **_): gigachannel_manager.download_manager = MockObject() gigachannel_manager.download_manager.download_exists = lambda _: True - mock_download = Mock() + mock_download = MagicMock() mock_download.get_state.get_status = DLSTATUS_SEEDING gigachannel_manager.download_manager.get_download = lambda _: mock_download diff --git a/src/tribler-core/tribler_core/components/libtorrent/download_manager/download.py b/src/tribler-core/tribler_core/components/libtorrent/download_manager/download.py index 0f93225371d..8d7453edc90 100644 --- a/src/tribler-core/tribler_core/components/libtorrent/download_manager/download.py +++ b/src/tribler-core/tribler_core/components/libtorrent/download_manager/download.py @@ -12,6 +12,7 @@ from ipv8.taskmanager import TaskManager, task from ipv8.util import int2byte, succeed +from tribler_core import notifications from tribler_core.components.libtorrent.download_manager.download_config import DownloadConfig from tribler_core.components.libtorrent.download_manager.download_state import DownloadState from tribler_core.components.libtorrent.download_manager.stream import Stream @@ -20,7 +21,7 @@ from tribler_core.components.libtorrent.utils.libtorrent_helper import libtorrent as lt from tribler_core.components.libtorrent.utils.torrent_utils import check_handle, get_info_from_handle, require_handle from tribler_core.exceptions import SaveResumeDataError -from tribler_core.notifier import Notifier +from tribler_core.utilities.notifier import Notifier from tribler_core.utilities.osutils import fix_filebasename from tribler_core.utilities.path_util import Path from tribler_core.utilities.simpledefs import DLSTATUS_SEEDING, DLSTATUS_STOPPED, DOWNLOAD, NTFY @@ -388,11 +389,11 @@ def on_torrent_checked_alert(self, _): def on_torrent_finished_alert(self, _): self.update_lt_status(self.handle.status()) self.checkpoint() - if self.get_state().get_total_transferred(DOWNLOAD) > 0 and self.stream is not None: - if self.notifier is not None: - self.notifier.notify(NTFY.TORRENT_FINISHED.value, self.tdef.get_infohash(), - self.tdef.get_name_as_unicode(), self.hidden or - self.config.get_channel_download()) + downloaded = self.get_state().get_total_transferred(DOWNLOAD) + if downloaded > 0 and self.stream is not None and self.notifier is not None: + self.notifier[notifications.torrent_finished](infohash=self.tdef.get_infohash().hex(), + name=self.tdef.get_name_as_unicode(), + hidden=self.hidden or self.config.get_channel_download()) def update_lt_status(self, lt_status): """ Update libtorrent stats and check if the download should be stopped.""" diff --git a/src/tribler-core/tribler_core/components/libtorrent/download_manager/download_manager.py b/src/tribler-core/tribler_core/components/libtorrent/download_manager/download_manager.py index 7df1260d117..4c7ff7166a4 100644 --- a/src/tribler-core/tribler_core/components/libtorrent/download_manager/download_manager.py +++ b/src/tribler-core/tribler_core/components/libtorrent/download_manager/download_manager.py @@ -15,6 +15,7 @@ from ipv8.taskmanager import TaskManager, task +from tribler_core import notifications from tribler_core.components.libtorrent.download_manager.dht_health_manager import DHTHealthManager from tribler_core.components.libtorrent.download_manager.download import Download from tribler_core.components.libtorrent.download_manager.download_config import DownloadConfig @@ -22,9 +23,9 @@ from tribler_core.components.libtorrent.torrentdef import TorrentDef, TorrentDefNoMetainfo from tribler_core.components.libtorrent.utils import torrent_utils from tribler_core.components.libtorrent.utils.libtorrent_helper import libtorrent as lt -from tribler_core.notifier import Notifier from tribler_core.utilities import path_util from tribler_core.utilities.network_utils import default_network_utils +from tribler_core.utilities.notifier import Notifier from tribler_core.utilities.path_util import Path from tribler_core.utilities.rest_utils import ( FILE_SCHEME, @@ -158,7 +159,7 @@ def initialize(self): self.set_download_states_callback(self.sesscb_states_callback) def notify_shutdown_state(self, state): - self.notifier.notify(NTFY.TRIBLER_SHUTDOWN_STATE.value, state) + self.notifier[notifications.tribler_shutdown_state](state) async def shutdown(self, timeout=30): if self.downloads: @@ -393,8 +394,8 @@ def process_alert(self, alert, hops=0): # We use the now-deprecated ``endpoint`` attribute for these older versions. self.listen_ports[hops] = getattr(alert, "port", alert.endpoint[1]) - elif alert_type == 'peer_disconnected_alert' and self.notifier: - self.notifier.notify(NTFY.PEER_DISCONNECTED_EVENT.value, alert.pid.to_bytes()) + elif alert_type == 'peer_disconnected_alert': + self.notifier[notifications.peer_disconnected](alert.pid.to_bytes()) elif alert_type == 'session_stats_alert': queued_disk_jobs = alert.values['disk.queued_disk_jobs'] @@ -814,8 +815,8 @@ async def sesscb_states_callback(self, states_list): if self.state_cb_count % 5 == 0 and download.config.get_hops() == 0 and self.notifier: for peer in download.get_peerlist(): if str(peer["extended_version"]).startswith('Tribler'): - self.notifier.notify(NTFY.TRIBLER_TORRENT_PEER_UPDATE.value, - unhexlify(peer["id"]), infohash, peer["dtotal"]) + self.notifier[notifications.tribler_torrent_peer_update](unhexlify(peer["id"]), infohash, + peer["dtotal"]) if self.state_cb_count % 4 == 0: self._last_states_list = states_list diff --git a/src/tribler-core/tribler_core/components/libtorrent/restapi/tests/test_torrentinfo_endpoint.py b/src/tribler-core/tribler_core/components/libtorrent/restapi/tests/test_torrentinfo_endpoint.py index ac81693195a..fc914f0ebf6 100644 --- a/src/tribler-core/tribler_core/components/libtorrent/restapi/tests/test_torrentinfo_endpoint.py +++ b/src/tribler-core/tribler_core/components/libtorrent/restapi/tests/test_torrentinfo_endpoint.py @@ -1,7 +1,7 @@ import json import shutil from binascii import unhexlify -from unittest.mock import Mock, patch +from unittest.mock import MagicMock, patch from urllib.parse import quote_plus, unquote_plus from aiohttp.web_app import Application @@ -10,6 +10,7 @@ import pytest +from tribler_core import notifications from tribler_core.components.libtorrent.restapi.torrentinfo_endpoint import TorrentInfoEndpoint from tribler_core.components.libtorrent.settings import LibtorrentSettings from tribler_core.components.libtorrent.torrentdef import TorrentDef @@ -18,7 +19,6 @@ from tribler_core.components.restapi.rest.rest_manager import error_middleware from tribler_core.tests.tools.common import TESTS_DATA_DIR, TESTS_DIR, TORRENT_UBUNTU_FILE, UBUNTU_1504_INFOHASH from tribler_core.utilities.rest_utils import path_to_uri -from tribler_core.utilities.simpledefs import NTFY from tribler_core.utilities.unicode import hexlify SAMPLE_CHANNEL_FILES_DIR = TESTS_DIR / "data" / "sample_channel" @@ -29,7 +29,7 @@ @pytest.fixture def download_manager(state_dir): - dlmgr = Mock() + dlmgr = MagicMock() dlmgr.config = LibtorrentSettings() dlmgr.shutdown = lambda: succeed(None) checkpoints_dir = state_dir / 'dlcheckpoints' @@ -41,7 +41,7 @@ def download_manager(state_dir): dlmgr.metainfo_requests = {} dlmgr.get_channel_downloads = lambda: [] dlmgr.shutdown = lambda: succeed(None) - dlmgr.notifier = Mock() + dlmgr.notifier = MagicMock() return dlmgr @@ -129,7 +129,7 @@ def get_metainfo(infohash, timeout=20, hops=None, url=None): await do_request(rest_api, f'torrentinfo?uri={path}', expected_code=500) # Ensure that correct torrent metadata was sent through notifier (to MetadataStore) - endpoint.download_manager.notifier.notify.assert_called_with(NTFY.TORRENT_METADATA_ADDED.value, metainfo_dict) + endpoint.download_manager.notifier[notifications.torrent_metadata_added].assert_called_with(metainfo_dict) endpoint.download_manager.get_metainfo = get_metainfo verify_valid_dict(await do_request(rest_api, f'torrentinfo?uri={path}', expected_code=200)) @@ -142,7 +142,7 @@ def get_metainfo(infohash, timeout=20, hops=None, url=None): path = 'http://fdsafksdlafdslkdksdlfjs9fsafasdf7lkdzz32.n38/324.torrent' await do_request(rest_api, f'torrentinfo?uri={path}', expected_code=500) - mock_download = Mock() + mock_download = MagicMock() path = quote_plus(f'magnet:?xt=urn:btih:{hexlify(UBUNTU_1504_INFOHASH)}&dn=test torrent') endpoint.download_manager.downloads = {UBUNTU_1504_INFOHASH: mock_download} result = await do_request(rest_api, f'torrentinfo?uri={path}', expected_code=200) @@ -157,7 +157,7 @@ def get_metainfo(infohash, timeout=20, hops=None, url=None): # Check that we return "downloads_exists" if there is a metainfo download for the infohash, # but there is also a regular download for the same infohash endpoint.download_manager.downloads = {UBUNTU_1504_INFOHASH: mock_download} - endpoint.download_manager.metainfo_requests = {UBUNTU_1504_INFOHASH: [Mock()]} + endpoint.download_manager.metainfo_requests = {UBUNTU_1504_INFOHASH: [MagicMock()]} result = await do_request(rest_api, f'torrentinfo?uri={path}', expected_code=200) assert result["download_exists"] diff --git a/src/tribler-core/tribler_core/components/libtorrent/restapi/torrentinfo_endpoint.py b/src/tribler-core/tribler_core/components/libtorrent/restapi/torrentinfo_endpoint.py index c06f1ef8497..0db741e97f4 100644 --- a/src/tribler-core/tribler_core/components/libtorrent/restapi/torrentinfo_endpoint.py +++ b/src/tribler-core/tribler_core/components/libtorrent/restapi/torrentinfo_endpoint.py @@ -10,6 +10,7 @@ from marshmallow.fields import String +from tribler_core import notifications from tribler_core.components.libtorrent.download_manager.download_manager import DownloadManager from tribler_core.components.libtorrent.torrentdef import TorrentDef from tribler_core.components.libtorrent.utils.libtorrent_helper import libtorrent as lt @@ -28,7 +29,6 @@ scheme_from_uri, uri_to_path, ) -from tribler_core.utilities.simpledefs import NTFY from tribler_core.utilities.unicode import hexlify, recursive_unicode from tribler_core.utilities.utilities import bdecode_compat, froze_it, parse_magnetlink @@ -126,8 +126,7 @@ async def get_torrent_info(self, request): return RESTResponse({"error": "invalid response"}, status=HTTP_INTERNAL_SERVER_ERROR) # Add the torrent to GigaChannel as a free-for-all entry, so others can search it - self.download_manager.notifier.notify( - NTFY.TORRENT_METADATA_ADDED.value, + self.download_manager.notifier[notifications.torrent_metadata_added]( tdef_to_metadata_dict(TorrentDef.load_from_dict(metainfo))) # TODO(Martijn): store the stuff in a database!!! diff --git a/src/tribler-core/tribler_core/components/libtorrent/tests/test_download_manager.py b/src/tribler-core/tribler_core/components/libtorrent/tests/test_download_manager.py index e034f43fe1d..bd97f864109 100644 --- a/src/tribler-core/tribler_core/components/libtorrent/tests/test_download_manager.py +++ b/src/tribler-core/tribler_core/components/libtorrent/tests/test_download_manager.py @@ -1,5 +1,5 @@ from asyncio import Future, gather, get_event_loop, sleep -from unittest.mock import Mock +from unittest.mock import MagicMock from ipv8.util import succeed @@ -23,7 +23,7 @@ def create_fake_download_and_state(): tdef = TorrentDef() tdef.get_infohash = lambda: b'aaaa' fake_peer = {'extended_version': 'Tribler', 'id': 'a' * 20, 'dtotal': 10 * 1024 * 1024} - fake_download = Mock() + fake_download = MagicMock() fake_download.get_def = lambda: tdef fake_download.get_def().get_name_as_unicode = lambda: "test.iso" fake_download.get_peerlist = lambda: [fake_peer] @@ -31,11 +31,11 @@ def create_fake_download_and_state(): fake_download.checkpoint = lambda: succeed(None) fake_download.stop = lambda: succeed(None) fake_download.shutdown = lambda: succeed(None) - dl_state = Mock() + dl_state = MagicMock() dl_state.get_infohash = lambda: b'aaaa' dl_state.get_status = lambda: DLSTATUS_SEEDING dl_state.get_download = lambda: fake_download - fake_config = Mock() + fake_config = MagicMock() fake_config.get_hops = lambda: 0 fake_config.get_safe_seeding = lambda: True fake_download.config = fake_config @@ -46,10 +46,10 @@ def create_fake_download_and_state(): @pytest.fixture async def fake_dlmgr(tmp_path_factory): config = LibtorrentSettings(dht_readiness_timeout=0) - dlmgr = DownloadManager(config=config, state_dir=tmp_path_factory.mktemp('state_dir'), notifier=Mock(), + dlmgr = DownloadManager(config=config, state_dir=tmp_path_factory.mktemp('state_dir'), notifier=MagicMock(), peer_mid=b"0000") dlmgr.metadata_tmpdir = tmp_path_factory.mktemp('metadata_tmpdir') - dlmgr.get_session = lambda *_, **__: Mock() + dlmgr.get_session = lambda *_, **__: MagicMock() yield dlmgr await dlmgr.shutdown(timeout=0) @@ -62,14 +62,14 @@ async def test_get_metainfo_valid_metadata(fake_dlmgr): infohash = b"a" * 20 metainfo = {b'info': {b'pieces': [b'a']}, b'leechers': 0, b'nodes': [], b'seeders': 0} - download_impl = Mock() + download_impl = MagicMock() download_impl.tdef.get_metainfo = lambda: None download_impl.future_metainfo = succeed(metainfo) fake_dlmgr.initialize() - fake_dlmgr.start_download = Mock(return_value=download_impl) + fake_dlmgr.start_download = MagicMock(return_value=download_impl) fake_dlmgr.download_defaults.number_hops = 1 - fake_dlmgr.remove_download = Mock(return_value=succeed(None)) + fake_dlmgr.remove_download = MagicMock(return_value=succeed(None)) assert await fake_dlmgr.get_metainfo(infohash) == metainfo fake_dlmgr.start_download.assert_called_once() @@ -84,15 +84,15 @@ async def test_get_metainfo_add_fail(fake_dlmgr): infohash = b"a" * 20 metainfo = {'pieces': ['a']} - download_impl = Mock() + download_impl = MagicMock() download_impl.future_metainfo = succeed(metainfo) download_impl.tdef.get_metainfo = lambda: None fake_dlmgr.initialize() - fake_dlmgr.start_download = Mock() + fake_dlmgr.start_download = MagicMock() fake_dlmgr.start_download.side_effect = TypeError fake_dlmgr.download_defaults.number_hops = 1 - fake_dlmgr.remove = Mock(return_value=succeed(None)) + fake_dlmgr.remove = MagicMock(return_value=succeed(None)) assert await fake_dlmgr.get_metainfo(infohash) is None fake_dlmgr.start_download.assert_called_once() @@ -107,15 +107,15 @@ async def test_get_metainfo_duplicate_request(fake_dlmgr): infohash = b"a" * 20 metainfo = {'pieces': ['a']} - download_impl = Mock() + download_impl = MagicMock() download_impl.tdef.get_metainfo = lambda: None download_impl.future_metainfo = Future() get_event_loop().call_later(0.1, download_impl.future_metainfo.set_result, metainfo) fake_dlmgr.initialize() - fake_dlmgr.start_download = Mock(return_value=download_impl) + fake_dlmgr.start_download = MagicMock(return_value=download_impl) fake_dlmgr.download_defaults.number_hops = 1 - fake_dlmgr.remove_download = Mock(return_value=succeed(None)) + fake_dlmgr.remove_download = MagicMock(return_value=succeed(None)) results = await gather(fake_dlmgr.get_metainfo(infohash), fake_dlmgr.get_metainfo(infohash)) assert results == [metainfo, metainfo] @@ -142,7 +142,7 @@ async def test_get_metainfo_with_already_added_torrent(fake_dlmgr): sample_torrent = TESTS_DATA_DIR / "bak_single.torrent" torrent_def = TorrentDef.load(sample_torrent) - download_impl = Mock() + download_impl = MagicMock() download_impl.future_metainfo = succeed(bencode(torrent_def.get_metainfo())) download_impl.checkpoint = lambda: succeed(None) download_impl.stop = lambda: succeed(None) @@ -161,17 +161,17 @@ async def test_start_download_while_getting_metainfo(fake_dlmgr): """ infohash = b"a" * 20 - metainfo_session = Mock() + metainfo_session = MagicMock() metainfo_session.get_torrents = lambda: [] - metainfo_dl = Mock() - metainfo_dl.get_def = lambda: Mock(get_infohash=lambda: infohash) + metainfo_dl = MagicMock() + metainfo_dl.get_def = lambda: MagicMock(get_infohash=lambda: infohash) fake_dlmgr.initialize() fake_dlmgr.get_session = lambda *_: metainfo_session fake_dlmgr.downloads[infohash] = metainfo_dl fake_dlmgr.metainfo_requests[infohash] = [metainfo_dl, 1] - fake_dlmgr.remove_download = Mock(return_value=succeed(None)) + fake_dlmgr.remove_download = MagicMock(return_value=succeed(None)) tdef = TorrentDefNoMetainfo(infohash, 'name', f'magnet:?xt=urn:btih:{hexlify(infohash)}&') download = fake_dlmgr.start_download(tdef=tdef, checkpoint_disabled=True) @@ -188,18 +188,18 @@ async def test_start_download(fake_dlmgr): """ infohash = b'a' * 20 - mock_handle = Mock() + mock_handle = MagicMock() mock_handle.info_hash = lambda: hexlify(infohash) mock_handle.is_valid = lambda: True - mock_error = Mock() + mock_error = MagicMock() mock_error.value = lambda: None mock_alert = type('add_torrent_alert', (object,), dict(handle=mock_handle, error=mock_error, category=lambda _: None))() - mock_ltsession = Mock() + mock_ltsession = MagicMock() mock_ltsession.get_torrents = lambda: [] mock_ltsession.async_add_torrent = lambda _: fake_dlmgr.register_task('post_alert', fake_dlmgr.process_alert, @@ -216,7 +216,7 @@ async def test_start_download(fake_dlmgr): # Test waiting on DHT getting enough nodes and adding the torrent after timing out fake_dlmgr.dht_readiness_timeout = 0.5 flag = [] - check_was_run = Mock() + check_was_run = MagicMock() async def mock_check(): while not flag: @@ -225,10 +225,10 @@ async def mock_check(): fake_dlmgr._check_dht_ready = mock_check fake_dlmgr.initialize() - mock_download = Mock() + mock_download = MagicMock() mock_download.get_def().get_infohash = lambda: b"1"*20 mock_download.future_added = succeed(True) - mock_ltsession.async_add_torrent = Mock() + mock_ltsession.async_add_torrent = MagicMock() await fake_dlmgr.start_handle(mock_download, {}) check_was_run.assert_called() fake_dlmgr.downloads.clear() @@ -248,11 +248,11 @@ async def test_start_download_existing_handle(fake_dlmgr): """ infohash = b'a' * 20 - mock_handle = Mock() + mock_handle = MagicMock() mock_handle.info_hash = lambda: hexlify(infohash) mock_handle.is_valid = lambda: True - mock_ltsession = Mock() + mock_ltsession = MagicMock() mock_ltsession.get_torrents = lambda: [mock_handle] fake_dlmgr.get_session = lambda *_: mock_ltsession @@ -271,10 +271,10 @@ async def test_start_download_existing_download(fake_dlmgr): """ infohash = b'a' * 20 - mock_download = Mock() - mock_download.get_def = lambda: Mock(get_trackers_as_single_tuple=lambda: ()) + mock_download = MagicMock() + mock_download.get_def = lambda: MagicMock(get_trackers_as_single_tuple=lambda: ()) - mock_ltsession = Mock() + mock_ltsession = MagicMock() fake_dlmgr.downloads[infohash] = mock_download fake_dlmgr.get_session = lambda *_: mock_ltsession @@ -298,7 +298,7 @@ def test_remove_unregistered_torrent(fake_dlmgr): Tests a successful removal status of torrents which aren't known """ fake_dlmgr.initialize() - mock_handle = Mock() + mock_handle = MagicMock() mock_handle.is_valid = lambda: False alert = type('torrent_removed_alert', (object, ), dict(handle=mock_handle, info_hash='0'*20)) fake_dlmgr.process_alert(alert()) @@ -326,7 +326,7 @@ def on_set_settings(settings): assert settings['proxy_peer_connections'] assert settings['proxy_hostnames'] - mock_lt_session = Mock() + mock_lt_session = MagicMock() mock_lt_session.get_settings = lambda: {} mock_lt_session.set_settings = on_set_settings mock_lt_session.set_proxy = on_proxy_set # Libtorrent < 1.1.0 uses set_proxy to set proxy settings @@ -337,8 +337,8 @@ def test_payout_on_disconnect(fake_dlmgr): """ Test whether a payout is initialized when a peer disconnects """ - disconnect_alert = type('peer_disconnected', (object,), dict(pid=Mock(to_bytes=lambda: b'a' * 20)))() - fake_dlmgr.payout_manager = Mock() + disconnect_alert = type('peer_disconnected', (object,), dict(pid=MagicMock(to_bytes=lambda: b'a' * 20)))() + fake_dlmgr.payout_manager = MagicMock() fake_dlmgr.initialize() fake_dlmgr.get_session(0).pop_alerts = lambda: [disconnect_alert] fake_dlmgr._task_process_alerts() @@ -350,7 +350,7 @@ async def test_post_session_stats(fake_dlmgr): """ Test whether post_session_stats actually updates the state of libtorrent readiness for clean shutdown. """ - mock_lt_session = Mock() + mock_lt_session = MagicMock() fake_dlmgr.ltsessions[0] = mock_lt_session # Check for status with session stats alert @@ -386,7 +386,7 @@ def test_load_empty_checkpoint(fake_dlmgr, tmpdir): Test whether download resumes with faulty pstate file. """ fake_dlmgr.get_downloads_pstate_dir = lambda: tmpdir - fake_dlmgr.start_download = Mock() + fake_dlmgr.start_download = MagicMock() # Empty pstate file pstate_filename = fake_dlmgr.get_downloads_pstate_dir() / 'abcd.state' @@ -448,7 +448,7 @@ def test_get_downloads_by_name(fake_dlmgr): @pytest.mark.asyncio async def test_check_for_dht_ready(fake_dlmgr): - fake_dlmgr.get_session = Mock() + fake_dlmgr.get_session = MagicMock() fake_dlmgr.get_session().status().dht_nodes = 1000 # If the session has enough peers, it should finish instantly await fake_dlmgr._check_dht_ready() diff --git a/src/tribler-core/tribler_core/components/metadata_store/db/orm_bindings/torrent_metadata.py b/src/tribler-core/tribler_core/components/metadata_store/db/orm_bindings/torrent_metadata.py index e7e15b620d7..f41c0cfaf40 100644 --- a/src/tribler-core/tribler_core/components/metadata_store/db/orm_bindings/torrent_metadata.py +++ b/src/tribler-core/tribler_core/components/metadata_store/db/orm_bindings/torrent_metadata.py @@ -4,16 +4,16 @@ from pony import orm from pony.orm import db_session +from tribler_core import notifications from tribler_core.components.metadata_store.category_filter.category import default_category_filter from tribler_core.components.metadata_store.category_filter.family_filter import default_xxx_filter from tribler_core.components.metadata_store.db.orm_bindings.channel_node import COMMITTED from tribler_core.components.metadata_store.db.serialization import EPOCH, REGULAR_TORRENT, TorrentMetadataPayload -from tribler_core.notifier import Notifier +from tribler_core.utilities.notifier import Notifier from tribler_core.utilities.tracker_utils import get_uniformed_tracker_url from tribler_core.utilities.unicode import ensure_unicode, hexlify NULL_KEY_SUBST = b"\00" -NEW_TORRENT_METADATA_CREATED: str = 'TorrentMetadata:new_torrent_metadata_created' # This function is used to devise id_ from infohash in deterministic way. Used in FFA channels. @@ -90,9 +90,7 @@ def __init__(self, *args, **kwargs): if 'tracker_info' in kwargs: self.add_tracker(kwargs["tracker_info"]) if notifier: - notifier.notify(NEW_TORRENT_METADATA_CREATED, - infohash=kwargs.get("infohash"), - title=self.title) + notifier[notifications.new_torrent_metadata_created](infohash=kwargs.get("infohash"), title=self.title) self.tag_processor_version = tag_processor_version def add_tracker(self, tracker_url): @@ -111,19 +109,19 @@ def get_magnet(self): @classmethod @db_session - def add_ffa_from_dict(cls, ffa_dict): + def add_ffa_from_dict(cls, metadata: dict): # To produce a relatively unique id_ we take some bytes of the infohash and convert these to a number. # abs is necessary as the conversion can produce a negative value, and we do not support that. - id_ = infohash_to_id(ffa_dict["infohash"]) + id_ = infohash_to_id(metadata["infohash"]) # Check that this torrent is yet unknown to GigaChannel, and if there is no duplicate FFA entry. # Test for a duplicate id_+public_key is necessary to account for a (highly improbable) situation when # two entries have different infohashes but the same id_. We do not want people to exploit this. - ih_blob = ffa_dict["infohash"] + ih_blob = metadata["infohash"] pk_blob = b"" if cls.exists(lambda g: (g.infohash == ih_blob) or (g.id_ == id_ and g.public_key == pk_blob)): return None # Add the torrent as a free-for-all entry if it is unknown to GigaChannel - return cls.from_dict(dict(ffa_dict, public_key=b'', status=COMMITTED, id_=id_)) + return cls.from_dict(dict(metadata, public_key=b'', status=COMMITTED, id_=id_)) @db_session def to_simple_dict(self): diff --git a/src/tribler-core/tribler_core/components/metadata_store/db/store.py b/src/tribler-core/tribler_core/components/metadata_store/db/store.py index 6704bb62eaa..16f0089970a 100644 --- a/src/tribler-core/tribler_core/components/metadata_store/db/store.py +++ b/src/tribler-core/tribler_core/components/metadata_store/db/store.py @@ -11,6 +11,7 @@ from pony import orm from pony.orm import db_session, desc, left_join, raw_sql, select +from tribler_core import notifications from tribler_core.components.metadata_store.db.orm_bindings import ( binary_node, channel_description, @@ -46,9 +47,9 @@ ) from tribler_core.components.metadata_store.remote_query_community.payload_checker import process_payload from tribler_core.exceptions import InvalidSignatureException +from tribler_core.utilities.notifier import Notifier from tribler_core.utilities.path_util import Path from tribler_core.utilities.pony_utils import get_or_create -from tribler_core.utilities.simpledefs import NTFY from tribler_core.utilities.unicode import hexlify from tribler_core.utilities.utilities import MEMORY_DB @@ -140,7 +141,7 @@ def __init__( channels_dir, my_key, disable_sync=False, - notifier=None, + notifier: Notifier = None, check_tables=True, db_version: int = CURRENT_DB_VERSION, tag_processor_version: int = 0 @@ -423,7 +424,7 @@ def process_channel_dir(self, dirname, public_key, id_, **kwargs): if self.notifier: channel_update_dict = channel.to_simple_dict() channel_update_dict["progress"] = float(processed_blobs_size) / total_blobs_size - self.notifier.notify(NTFY.CHANNEL_ENTITY_UPDATED, channel_update_dict) + self.notifier[notifications.channel_entity_updated](channel_update_dict) except InvalidSignatureException: self._logger.error("Not processing metadata located at %s: invalid signature", full_filename) diff --git a/src/tribler-core/tribler_core/components/metadata_store/metadata_store_component.py b/src/tribler-core/tribler_core/components/metadata_store/metadata_store_component.py index 141f6f2fa08..ea3245f58bd 100644 --- a/src/tribler-core/tribler_core/components/metadata_store/metadata_store_component.py +++ b/src/tribler-core/tribler_core/components/metadata_store/metadata_store_component.py @@ -1,8 +1,9 @@ +from tribler_core import notifications from tribler_core.components.base import Component from tribler_core.components.key.key_component import KeyComponent from tribler_core.components.metadata_store.db.store import MetadataStore from tribler_core.components.tag.rules.tag_rules_processor import TagRulesProcessor -from tribler_core.utilities.simpledefs import NTFY, STATEDIR_DB_DIR +from tribler_core.utilities.simpledefs import STATEDIR_DB_DIR class MetadataStoreComponent(Component): @@ -43,7 +44,7 @@ async def run(self): tag_processor_version=TagRulesProcessor.version ) self.mds = metadata_store - self.session.notifier.add_observer(NTFY.TORRENT_METADATA_ADDED.value, + self.session.notifier.add_observer(notifications.torrent_metadata_added, metadata_store.TorrentMetadata.add_ffa_from_dict) async def shutdown(self): diff --git a/src/tribler-core/tribler_core/components/metadata_store/restapi/tests/test_metadata_endpoint.py b/src/tribler-core/tribler_core/components/metadata_store/restapi/tests/test_metadata_endpoint.py index 4f54ee34ae7..e35d79f1a32 100644 --- a/src/tribler-core/tribler_core/components/metadata_store/restapi/tests/test_metadata_endpoint.py +++ b/src/tribler-core/tribler_core/components/metadata_store/restapi/tests/test_metadata_endpoint.py @@ -1,5 +1,5 @@ import json -from unittest.mock import Mock +from unittest.mock import MagicMock from aiohttp.web_app import Application @@ -27,9 +27,9 @@ async def torrent_checker(loop, mock_dlmgr, metadata_store): # Initialize the torrent checker config = TriblerConfig() config.download_defaults.number_hops = 0 - tracker_manager = Mock() + tracker_manager = MagicMock() tracker_manager.blacklist = [] - notifier = Mock() + notifier = MagicMock() torrent_checker = TorrentChecker( config=config, download_manager=mock_dlmgr, diff --git a/src/tribler-core/tribler_core/components/metadata_store/tests/test_channel_download.py b/src/tribler-core/tribler_core/components/metadata_store/tests/test_channel_download.py index 280c925b4c8..6d5d2d54d98 100644 --- a/src/tribler-core/tribler_core/components/metadata_store/tests/test_channel_download.py +++ b/src/tribler-core/tribler_core/components/metadata_store/tests/test_channel_download.py @@ -1,4 +1,4 @@ -from asynctest import Mock +from asynctest import MagicMock from ipv8.util import succeed @@ -34,7 +34,7 @@ async def channel_seeder(channel_tdef, loop, tmp_path_factory): # pylint: disab config.upnp = False config.natpmp = False config.lsd = False - seeder_dlmgr = DownloadManager(state_dir=tmp_path_factory.mktemp('state_dir'), config=config, notifier=Mock(), + seeder_dlmgr = DownloadManager(state_dir=tmp_path_factory.mktemp('state_dir'), config=config, notifier=MagicMock(), peer_mid=b"0000") seeder_dlmgr.metadata_tmpdir = tmp_path_factory.mktemp('metadata_tmpdir') seeder_dlmgr.initialize() @@ -52,7 +52,7 @@ async def gigachannel_manager(metadata_store, download_manager): state_dir=metadata_store.channels_dir.parent, download_manager=download_manager, metadata_store=metadata_store, - notifier=Mock(), + notifier=MagicMock(), ) yield gigachannel_manager await gigachannel_manager.shutdown() diff --git a/src/tribler-core/tribler_core/components/payout/payout_component.py b/src/tribler-core/tribler_core/components/payout/payout_component.py index 34856acbe57..b65240fff66 100644 --- a/src/tribler-core/tribler_core/components/payout/payout_component.py +++ b/src/tribler-core/tribler_core/components/payout/payout_component.py @@ -1,9 +1,9 @@ +from tribler_core import notifications from tribler_core.components.bandwidth_accounting.bandwidth_accounting_component import BandwidthAccountingComponent from tribler_core.components.base import Component from tribler_core.components.ipv8.ipv8_component import Ipv8Component from tribler_core.components.payout.payout_manager import PayoutManager from tribler_core.components.reporter.reporter_component import ReporterComponent -from tribler_core.utilities.simpledefs import NTFY INFINITE = -1 @@ -25,15 +25,15 @@ async def run(self): self.payout_manager = PayoutManager(bandwidth_accounting_component.community, ipv8_component.dht_discovery_community) - self.session.notifier.add_observer(NTFY.PEER_DISCONNECTED_EVENT.value, self.payout_manager.do_payout) - self.session.notifier.add_observer(NTFY.TRIBLER_TORRENT_PEER_UPDATE.value, self.payout_manager.update_peer) + self.session.notifier.add_observer(notifications.peer_disconnected, self.payout_manager.on_peer_disconnected) + self.session.notifier.add_observer(notifications.tribler_torrent_peer_update, self.payout_manager.update_peer) async def shutdown(self): await super().shutdown() if self.payout_manager: - self.session.notifier.remove_observer(NTFY.PEER_DISCONNECTED_EVENT.value, self.payout_manager.do_payout) - self.session.notifier.remove_observer(NTFY.TRIBLER_TORRENT_PEER_UPDATE.value, - self.payout_manager.update_peer) + notifier = self.session.notifier + notifier.remove_observer(notifications.peer_disconnected, self.payout_manager.on_peer_disconnected) + notifier.remove_observer(notifications.tribler_torrent_peer_update, self.payout_manager.update_peer) await self.payout_manager.shutdown() diff --git a/src/tribler-core/tribler_core/components/payout/payout_manager.py b/src/tribler-core/tribler_core/components/payout/payout_manager.py index e07c67887d3..e7c30aa2cf7 100644 --- a/src/tribler-core/tribler_core/components/payout/payout_manager.py +++ b/src/tribler-core/tribler_core/components/payout/payout_manager.py @@ -17,19 +17,23 @@ def __init__(self, bandwidth_community, dht): self.dht = dht self.tribler_peers = {} + def on_peer_disconnected(self, peer_id: bytes): + # do_payout is not specified directly, as PyCharm does not understand its type correctly due to a task decorator + self.do_payout(peer_id) + @task - async def do_payout(self, mid): + async def do_payout(self, peer_id: bytes): """ Perform a payout to a given mid. First, determine the outstanding balance. Then resolve the node in the DHT. """ - if mid not in self.tribler_peers: + if peer_id not in self.tribler_peers: return None - total_bytes = sum(self.tribler_peers[mid].values()) + total_bytes = sum(self.tribler_peers[peer_id].values()) - self.logger.info("Doing direct payout to %s (%d bytes)", hexlify(mid), total_bytes) + self.logger.info("Doing direct payout to %s (%d bytes)", hexlify(peer_id), total_bytes) try: - nodes = await self.dht.connect_peer(mid) + nodes = await self.dht.connect_peer(peer_id) except Exception as e: self.logger.warning("Error while doing DHT lookup for payouts, error %s", e) return None @@ -45,20 +49,20 @@ async def do_payout(self, mid): return None # Remove the outstanding bytes; otherwise we will payout again - self.tribler_peers.pop(mid, None) + self.tribler_peers.pop(peer_id, None) return nodes[0] - def update_peer(self, mid, infohash, balance): + def update_peer(self, peer_id: bytes, infohash: bytes, balance: int): """ Update a peer with a specific mid for a specific infohash. """ - self.logger.debug("Updating peer with mid %s and ih %s (balance: %d)", hexlify(mid), + self.logger.debug("Updating peer with mid %s and ih %s (balance: %d)", hexlify(peer_id), hexlify(infohash), balance) - if mid not in self.tribler_peers: - self.tribler_peers[mid] = {} + if peer_id not in self.tribler_peers: + self.tribler_peers[peer_id] = {} - self.tribler_peers[mid][infohash] = balance + self.tribler_peers[peer_id][infohash] = balance async def shutdown(self): await self.shutdown_task_manager() diff --git a/src/tribler-core/tribler_core/components/resource_monitor/implementation/core.py b/src/tribler-core/tribler_core/components/resource_monitor/implementation/core.py index 48e0f0bae5b..96c7afeca30 100644 --- a/src/tribler-core/tribler_core/components/resource_monitor/implementation/core.py +++ b/src/tribler-core/tribler_core/components/resource_monitor/implementation/core.py @@ -6,11 +6,11 @@ import psutil +from tribler_core import notifications from tribler_core.components.resource_monitor.implementation.base import ResourceMonitor from tribler_core.components.resource_monitor.implementation.profiler import YappiProfiler from tribler_core.components.resource_monitor.settings import ResourceMonitorSettings -from tribler_core.notifier import Notifier -from tribler_core.utilities.simpledefs import NTFY +from tribler_core.utilities.notifier import Notifier FREE_DISK_THRESHOLD = 100 * (1024 * 1024) # 100MB DEFAULT_RESOURCE_FILENAME = "resources.log" @@ -102,7 +102,7 @@ def record_disk_usage(self, recorded_at=None): if disk_usage.free < FREE_DISK_THRESHOLD: self._logger.warning("Warning! Less than 100MB of disk space available") if self.notifier: - self.notifier.notify(NTFY.LOW_SPACE.value, self.disk_usage_data[-1]) + self.notifier[notifications.low_space](self.disk_usage_data[-1]) def get_free_disk_space(self): return psutil.disk_usage(str(self.state_dir)) diff --git a/src/tribler-core/tribler_core/components/resource_monitor/implementation/tests/test_resource_monitor.py b/src/tribler-core/tribler_core/components/resource_monitor/implementation/tests/test_resource_monitor.py index 86f4901c9d5..2820e7f8ba7 100644 --- a/src/tribler-core/tribler_core/components/resource_monitor/implementation/tests/test_resource_monitor.py +++ b/src/tribler-core/tribler_core/components/resource_monitor/implementation/tests/test_resource_monitor.py @@ -2,23 +2,21 @@ import random import time from collections import deque, namedtuple -from unittest.mock import Mock +from unittest.mock import MagicMock import pytest +from tribler_core import notifications from tribler_core.components.resource_monitor.implementation.core import CoreResourceMonitor from tribler_core.components.resource_monitor.settings import ResourceMonitorSettings -from tribler_core.utilities.simpledefs import NTFY +from tribler_core.utilities.notifier import Notifier @pytest.fixture(name="resource_monitor") async def fixture_resource_monitor(tmp_path): config = ResourceMonitorSettings() - notifier = Mock() - resource_monitor = CoreResourceMonitor(state_dir=tmp_path, - log_dir=tmp_path, - config=config, - notifier=notifier, + notifier = Notifier() + resource_monitor = CoreResourceMonitor(state_dir=tmp_path, log_dir=tmp_path, config=config, notifier=notifier, history_size=10) yield resource_monitor await resource_monitor.stop() @@ -84,12 +82,12 @@ def fake_get_free_disk_space(): disk = {"total": 318271800, "used": 312005050, "free": 6266750, "percent": 98.0} return namedtuple('sdiskusage', disk.keys())(*disk.values()) - def on_notify(subject, *args): - assert subject in [NTFY.LOW_SPACE.value, NTFY.TRIBLER_SHUTDOWN_STATE.value] + resource_monitor.notifier = MagicMock() resource_monitor.get_free_disk_space = fake_get_free_disk_space - resource_monitor.notifier.notify = on_notify resource_monitor.check_resources() + resource_monitor.notifier[notifications.low_space].assert_called() + resource_monitor.notifier[notifications.tribler_shutdown_state].assert_called() def test_resource_log(resource_monitor): diff --git a/src/tribler-core/tribler_core/components/restapi/rest/events_endpoint.py b/src/tribler-core/tribler_core/components/restapi/rest/events_endpoint.py index 60312974eac..e07ad16aebf 100644 --- a/src/tribler-core/tribler_core/components/restapi/rest/events_endpoint.py +++ b/src/tribler-core/tribler_core/components/restapi/rest/events_endpoint.py @@ -14,12 +14,11 @@ from marshmallow.fields import Dict, String +from tribler_core import notifications from tribler_core.components.reporter.reported_error import ReportedError from tribler_core.components.restapi.rest.rest_endpoint import RESTEndpoint, RESTStreamResponse from tribler_core.components.restapi.rest.util import fix_unicode_dict -from tribler_core.notifier import Notifier -from tribler_core.utilities.simpledefs import NTFY -from tribler_core.utilities.unicode import hexlify +from tribler_core.utilities.notifier import Notifier from tribler_core.utilities.utilities import froze_it from tribler_core.version import version_id @@ -28,30 +27,18 @@ def passthrough(x): return x -# pylint: disable=line-too-long -reactions_dict = { - # A corrupt .torrent file in the watch folder is found. Contains the name of the corrupt torrent file. - NTFY.WATCH_FOLDER_CORRUPT_FILE: lambda text: {"name": text}, - # A new version of Tribler is available. - NTFY.TRIBLER_NEW_VERSION: lambda text: {"version": text}, - # Tribler has discovered a new channel. Contains the channel data. - NTFY.CHANNEL_DISCOVERED: passthrough, - # A torrent has finished downloading. Contains the infohash and the name of the torrent - NTFY.TORRENT_FINISHED: lambda *args: {"infohash": hexlify(args[0]), "name": args[1], "hidden": args[2]}, - # Information about some torrent has been updated (e.g. health). Contains updated torrent data - NTFY.CHANNEL_ENTITY_UPDATED: passthrough, - # Tribler is going to shutdown. - NTFY.TRIBLER_SHUTDOWN_STATE: passthrough, - # Remote GigaChannel search results were received by Tribler. Contains received entries. - NTFY.REMOTE_QUERY_RESULTS: passthrough, - # Tribler is low on disk space for storing torrents - NTFY.LOW_SPACE: passthrough, - # Report config error on startup - NTFY.REPORT_CONFIG_ERROR: passthrough, -} - - -# pylint: enable=line-too-long +topics_to_send_to_gui = [ + notifications.tunnel_removed, + notifications.watch_folder_corrupt_file, + notifications.tribler_new_version, + notifications.channel_discovered, + notifications.torrent_finished, + notifications.channel_entity_updated, + notifications.tribler_shutdown_state, + notifications.remote_query_results, + notifications.low_space, + notifications.report_config_error, +] @froze_it @@ -67,31 +54,22 @@ def __init__(self, notifier: Notifier, public_key: str = None): TaskManager.__init__(self) self.events_responses: List[RESTStreamResponse] = [] self.app.on_shutdown.append(self.on_shutdown) - self.notifier = None self.undelivered_error: Optional[dict] = None - self.connect_notifier(notifier) self.public_key = public_key - - def connect_notifier(self, notifier: Notifier): self.notifier = notifier + notifier.add_observer(notifications.circuit_removed, self.on_circuit_removed) + notifier.add_generic_observer(self.on_notification) - for event_type, event_lambda in reactions_dict.items(): - self.notifier.add_observer(event_type.value, - lambda *args, el=event_lambda, et=event_type: - self.write_data({"type": et.value, "event": el(*args)})) - - def on_circuit_removed(circuit, *args): - if isinstance(circuit, Circuit): - event = { - "circuit_id": circuit.circuit_id, - "bytes_up": circuit.bytes_up, - "bytes_down": circuit.bytes_down, - "uptime": time.time() - circuit.creation_time - } - self.write_data({"type": NTFY.TUNNEL_REMOVE.value, "event": event}) + def on_notification(self, topic, *args, **kwargs): + if topic in topics_to_send_to_gui: + self.write_data({"topic": topic.__name__, "args": args, "kwargs": kwargs}) - # Tribler tunnel circuit has been removed - self.notifier.add_observer(NTFY.TUNNEL_REMOVE.value, on_circuit_removed) + def on_circuit_removed(self, circuit: Circuit, additional_info: str): + # The original notification contains non-JSON-serializable argument, so we send another one to GUI + self.notifier[notifications.tunnel_removed](circuit_id=circuit.circuit_id, bytes_up=circuit.bytes_up, + bytes_down=circuit.bytes_down, + uptime=time.time() - circuit.creation_time, + additional_info=additional_info) async def on_shutdown(self, _): await self.shutdown_task_manager() @@ -101,14 +79,14 @@ def setup_routes(self): def initial_message(self) -> dict: return { - "type": NTFY.EVENTS_START.value, - "event": {"public_key": self.public_key, "version": version_id} + "topic": notifications.events_start.__name__, + "kwargs": {"public_key": self.public_key, "version": version_id} } def error_message(self, reported_error: ReportedError) -> dict: return { - "type": NTFY.TRIBLER_EXCEPTION.value, - "event": asdict(reported_error), + "topic": notifications.tribler_exception.__name__, + "kwargs": {"error": asdict(reported_error)}, } def encode_message(self, message: dict) -> bytes: @@ -130,8 +108,13 @@ async def write_data(self, message): """ if not self.has_connection_to_gui(): return + try: + message_bytes = self.encode_message(message) + except Exception as e: # pylint: disable=broad-except + # if a notification arguments contains non-JSON-serializable data, the exception should be logged + self._logger.exception(e) + return - message_bytes = self.encode_message(message) for response in self.events_responses: await response.write(message_bytes) diff --git a/src/tribler-core/tribler_core/components/restapi/rest/tests/test_debug_endpoint.py b/src/tribler-core/tribler_core/components/restapi/rest/tests/test_debug_endpoint.py index 96c53010f2d..c6295dbed0b 100644 --- a/src/tribler-core/tribler_core/components/restapi/rest/tests/test_debug_endpoint.py +++ b/src/tribler-core/tribler_core/components/restapi/rest/tests/test_debug_endpoint.py @@ -1,6 +1,6 @@ import os from pathlib import Path -from unittest.mock import Mock, patch +from unittest.mock import MagicMock, patch from aiohttp.web_app import Application @@ -15,7 +15,7 @@ @pytest.fixture def mock_tunnel_community(): - return Mock() + return MagicMock() @pytest.fixture @@ -27,7 +27,7 @@ def endpoint(tmp_path, mock_tunnel_community, core_resource_monitor): # pylint: @pytest.fixture async def core_resource_monitor(tmp_path): - resource_monitor = CoreResourceMonitor(notifier=Mock(), + resource_monitor = CoreResourceMonitor(notifier=MagicMock(), state_dir=tmp_path, config=ResourceMonitorSettings(), log_dir=tmp_path / 'logs') diff --git a/src/tribler-core/tribler_core/components/restapi/rest/tests/test_events_endpoint.py b/src/tribler-core/tribler_core/components/restapi/rest/tests/test_events_endpoint.py index 2c663e0d07b..4efcfada49f 100644 --- a/src/tribler-core/tribler_core/components/restapi/rest/tests/test_events_endpoint.py +++ b/src/tribler-core/tribler_core/components/restapi/rest/tests/test_events_endpoint.py @@ -5,18 +5,16 @@ from aiohttp import ClientSession -from ipv8.messaging.anonymization.tunnel import Circuit - import pytest +from tribler_core import notifications from tribler_core.components.reporter.reported_error import ReportedError from tribler_core.components.restapi.rest.events_endpoint import EventsEndpoint from tribler_core.components.restapi.rest.rest_endpoint import RESTStreamResponse from tribler_core.components.restapi.rest.rest_manager import ApiKeyMiddleware, RESTManager, error_middleware from tribler_core.components.restapi.rest.root_endpoint import RootEndpoint from tribler_core.config.tribler_config import TriblerConfig -from tribler_core.notifier import Notifier -from tribler_core.utilities.simpledefs import NTFY +from tribler_core.utilities.notifier import Notifier from tribler_core.version import version_id messages_to_wait_for = set() @@ -28,8 +26,8 @@ def fixture_api_port(free_port): @pytest.fixture(name='notifier') -def fixture_notifier(): - return Notifier() +def fixture_notifier(loop): + return Notifier(loop=loop) @pytest.fixture(name='endpoint') @@ -42,8 +40,8 @@ def fixture_reported_error(): return ReportedError('type', 'text', {}) -@pytest.fixture -async def rest_manager(api_port, tmp_path, endpoint): +@pytest.fixture(name="rest_manager") +async def fixture_rest_manager(api_port, tmp_path, endpoint): config = TriblerConfig() config.api.http_enabled = True config.api.http_port = api_port @@ -71,14 +69,15 @@ async def open_events_socket(rest_manager_, connected_event, events_up): while True: msg = await response.content.readline() await response.content.readline() - messages_to_wait_for.remove(json.loads(msg[5:])["type"]) + topic_name = json.loads(msg[5:])["topic"] + messages_to_wait_for.remove(topic_name) if not messages_to_wait_for: events_up.set() break @pytest.mark.asyncio -async def test_events(rest_manager, notifier): +async def test_events(rest_manager, notifier: Notifier): """ Testing whether various events are coming through the events endpoints """ @@ -89,24 +88,22 @@ async def test_events(rest_manager, notifier): event_socket_task = create_task(open_events_socket(rest_manager, connected_event, events_up)) await connected_event.wait() - testdata = { - NTFY.CHANNEL_ENTITY_UPDATED: {"state": "Complete"}, - NTFY.WATCH_FOLDER_CORRUPT_FILE: ("foo",), - NTFY.TRIBLER_NEW_VERSION: ("123",), - NTFY.CHANNEL_DISCOVERED: {"result": "bla"}, - NTFY.TORRENT_FINISHED: (b'a' * 10, None, False), - NTFY.LOW_SPACE: ("",), - NTFY.TUNNEL_REMOVE: (Circuit(1234, None), 'test'), - NTFY.REMOTE_QUERY_RESULTS: {"query": "test"}, + notifier[notifications.channel_entity_updated]({"state": "Complete"}) + notifier[notifications.watch_folder_corrupt_file]("some_file_name") + notifier[notifications.tribler_new_version]("1.2.3") + notifier[notifications.channel_discovered]({"result": "bla"}) + notifier[notifications.torrent_finished]('a' * 10, "torrent_name", False) + notifier[notifications.low_space]({}) + notifier[notifications.tunnel_removed](circuit_id=1234, bytes_up=0, bytes_down=0, uptime=1000, + additional_info='test') + notifier[notifications.remote_query_results]({"query": "test"}) + rest_manager.root_endpoint.endpoints['/events'].on_tribler_exception(ReportedError('', '', {})) + + messages_to_wait_for = { + 'channel_entity_updated', 'watch_folder_corrupt_file', 'tribler_new_version', 'channel_discovered', + 'torrent_finished', 'low_space', 'tunnel_removed', 'remote_query_results', 'tribler_exception' } - messages_to_wait_for = {k.value for k in testdata} - messages_to_wait_for.add(NTFY.TRIBLER_EXCEPTION.value) - for subject, data in testdata.items(): - if data: - notifier.notify(subject.value, *data) - else: - notifier.notify(subject.value) - rest_manager.root_endpoint.endpoints['/events'].on_tribler_exception(ReportedError('', '', {}, False)) + await events_up.wait() event_socket_task.cancel() diff --git a/src/tribler-core/tribler_core/components/tag/rules/tag_rules_processor.py b/src/tribler-core/tribler_core/components/tag/rules/tag_rules_processor.py index 595dc89aff2..c40f8295936 100644 --- a/src/tribler-core/tribler_core/components/tag/rules/tag_rules_processor.py +++ b/src/tribler-core/tribler_core/components/tag/rules/tag_rules_processor.py @@ -5,12 +5,12 @@ from pony.orm import db_session -import tribler_core.components.metadata_store.db.orm_bindings.torrent_metadata as torrent_metadata -import tribler_core.components.metadata_store.db.store as MDS +from tribler_core import notifications from tribler_core.components.metadata_store.db.serialization import REGULAR_TORRENT +from tribler_core.components.metadata_store.db.store import MetadataStore from tribler_core.components.tag.db.tag_db import TagDatabase from tribler_core.components.tag.rules.tag_rules import extract_only_valid_tags -from tribler_core.notifier import Notifier +from tribler_core.utilities.notifier import Notifier DEFAULT_INTERVAL = 10 DEFAULT_BATCH_SIZE = 1000 @@ -22,7 +22,7 @@ class TagRulesProcessor(TaskManager): # this value must be incremented in the case of new rules set has been applied version: int = 1 - def __init__(self, notifier: Notifier, db: TagDatabase, mds: MDS.MetadataStore, + def __init__(self, notifier: Notifier, db: TagDatabase, mds: MetadataStore, batch_size: int = DEFAULT_BATCH_SIZE, interval: float = DEFAULT_INTERVAL): """ Default values for batch_size and interval are chosen so that tag processing is not too heavy @@ -36,8 +36,8 @@ def __init__(self, notifier: Notifier, db: TagDatabase, mds: MDS.MetadataStore, self.mds = mds self.batch_size = batch_size self.interval = interval - self.notifier.add_observer(torrent_metadata.NEW_TORRENT_METADATA_CREATED, - callback=self.process_torrent_title) + self.notifier.add_observer(notifications.new_torrent_metadata_created, self.process_torrent_title) + @db_session def start(self): self.logger.info('Start') diff --git a/src/tribler-core/tribler_core/components/tag/rules/tests/test_tag_rules_processor.py b/src/tribler-core/tribler_core/components/tag/rules/tests/test_tag_rules_processor.py index 2e972bdab29..14ce6b140ca 100644 --- a/src/tribler-core/tribler_core/components/tag/rules/tests/test_tag_rules_processor.py +++ b/src/tribler-core/tribler_core/components/tag/rules/tests/test_tag_rules_processor.py @@ -1,9 +1,9 @@ from types import SimpleNamespace -from unittest.mock import Mock, patch +from unittest.mock import MagicMock, patch import pytest -from tribler_core.components.metadata_store.db.orm_bindings.torrent_metadata import NEW_TORRENT_METADATA_CREATED +from tribler_core import notifications from tribler_core.components.tag.rules.tag_rules_processor import LAST_PROCESSED_TORRENT_ID, TagRulesProcessor TEST_BATCH_SIZE = 100 @@ -13,7 +13,8 @@ # pylint: disable=redefined-outer-name, protected-access @pytest.fixture def tag_rules_processor(): - return TagRulesProcessor(notifier=Mock(), db=Mock(), mds=Mock(), batch_size=TEST_BATCH_SIZE, interval=TEST_INTERVAL) + return TagRulesProcessor(notifier=MagicMock(), db=MagicMock(), mds=MagicMock(), batch_size=TEST_BATCH_SIZE, + interval=TEST_INTERVAL) def test_constructor(tag_rules_processor: TagRulesProcessor): @@ -21,12 +22,12 @@ def test_constructor(tag_rules_processor: TagRulesProcessor): assert tag_rules_processor.batch_size == TEST_BATCH_SIZE assert tag_rules_processor.interval == TEST_INTERVAL - m: Mock = tag_rules_processor.notifier.add_observer - m.assert_called_with(NEW_TORRENT_METADATA_CREATED, callback=tag_rules_processor.process_torrent_title) + m: MagicMock = tag_rules_processor.notifier.add_observer + m.assert_called_with(notifications.new_torrent_metadata_created, tag_rules_processor.process_torrent_title) @patch.object(TagRulesProcessor, 'save_tags') -def test_process_torrent_file(mocked_save_tags: Mock, tag_rules_processor: TagRulesProcessor): +def test_process_torrent_file(mocked_save_tags: MagicMock, tag_rules_processor: TagRulesProcessor): # test on None assert not tag_rules_processor.process_torrent_title(infohash=None, title='title') assert not tag_rules_processor.process_torrent_title(infohash=b'infohash', title=None) @@ -52,7 +53,7 @@ def test_save_tags(tag_rules_processor: TagRulesProcessor): assert [c for c in actual_calls if c not in expected_calls] == [] -@patch.object(TagRulesProcessor, 'process_torrent_title', new=Mock(return_value=1)) +@patch.object(TagRulesProcessor, 'process_torrent_title', new=MagicMock(return_value=1)) def test_process_batch_within_the_boundary(tag_rules_processor: TagRulesProcessor): # test inner logic of `process_batch` in case this batch located within the boundary returned_batch_size = TEST_BATCH_SIZE // 2 # let's return a half of requested items @@ -73,7 +74,7 @@ def select(_): tag_rules_processor.mds.set_value.assert_called_with(LAST_PROCESSED_TORRENT_ID, str(TEST_BATCH_SIZE)) -@patch.object(TagRulesProcessor, 'process_torrent_title', new=Mock(return_value=1)) +@patch.object(TagRulesProcessor, 'process_torrent_title', new=MagicMock(return_value=1)) def test_process_batch_beyond_the_boundary(tag_rules_processor: TagRulesProcessor): # test inner logic of `process_batch` in case this batch located on a border returned_batch_size = TEST_BATCH_SIZE // 2 # let's return a half of requested items diff --git a/src/tribler-core/tribler_core/components/torrent_checker/torrent_checker/tests/test_torrentchecker.py b/src/tribler-core/tribler_core/components/torrent_checker/torrent_checker/tests/test_torrentchecker.py index 5865b19477b..3fa69eab020 100644 --- a/src/tribler-core/tribler_core/components/torrent_checker/torrent_checker/tests/test_torrentchecker.py +++ b/src/tribler-core/tribler_core/components/torrent_checker/torrent_checker/tests/test_torrentchecker.py @@ -3,7 +3,7 @@ import secrets import time -from asynctest import Mock +from asynctest import MagicMock from ipv8.util import succeed @@ -29,8 +29,8 @@ def tracker_manager(tmp_path, metadata_store): async def fixture_torrent_checker(tribler_config, tracker_manager, metadata_store): torrent_checker = TorrentChecker(config=tribler_config, - download_manager=Mock(), - notifier=Mock(), + download_manager=MagicMock(), + notifier=MagicMock(), metadata_store=metadata_store, tracker_manager=tracker_manager ) @@ -368,7 +368,7 @@ def add_torrent_to_channel(infohash, last_check): torrent.health.last_check = last_check return torrent - check_torrent_health_mock = Mock(return_value=None) + check_torrent_health_mock = MagicMock(return_value=None) torrent_checker.check_torrent_health = lambda _: check_torrent_health_mock() # No torrents yet in channel, the selected channel torrents to check should be empty diff --git a/src/tribler-core/tribler_core/components/torrent_checker/torrent_checker/torrent_checker.py b/src/tribler-core/tribler_core/components/torrent_checker/torrent_checker/torrent_checker.py index 2d61f1ac026..72b4aa7df74 100644 --- a/src/tribler-core/tribler_core/components/torrent_checker/torrent_checker/torrent_checker.py +++ b/src/tribler-core/tribler_core/components/torrent_checker/torrent_checker/torrent_checker.py @@ -9,6 +9,7 @@ from pony.orm import db_session, desc, select +from tribler_core import notifications from tribler_core.components.libtorrent.download_manager.download_manager import DownloadManager from tribler_core.components.metadata_store.db.serialization import REGULAR_TORRENT from tribler_core.components.metadata_store.db.store import MetadataStore @@ -20,8 +21,7 @@ ) from tribler_core.components.torrent_checker.torrent_checker.tracker_manager import MAX_TRACKER_FAILURES, TrackerManager from tribler_core.config.tribler_config import TriblerConfig -from tribler_core.notifier import Notifier -from tribler_core.utilities.simpledefs import NTFY +from tribler_core.utilities.notifier import Notifier from tribler_core.utilities.tracker_utils import MalformedTrackerURLException from tribler_core.utilities.unicode import hexlify from tribler_core.utilities.utilities import has_bep33_support, is_valid_url @@ -296,12 +296,11 @@ def on_torrent_health_check_completed(self, infohash, result): final_response = {} if not result or not isinstance(result, list): self._logger.info("Received invalid torrent checker result") - self.notifier.notify(NTFY.CHANNEL_ENTITY_UPDATED.value, - {"infohash": hexlify(infohash), - "num_seeders": 0, - "num_leechers": 0, - "last_tracker_check": int(time.time()), - "health": "updated"}) + self.notifier[notifications.channel_entity_updated]({"infohash": hexlify(infohash), + "num_seeders": 0, + "num_leechers": 0, + "last_tracker_check": int(time.time()), + "health": "updated"}) return final_response torrent_update_dict = {'infohash': infohash, 'seeders': 0, 'leechers': 0, 'last_check': int(time.time())} @@ -328,12 +327,11 @@ def on_torrent_health_check_completed(self, infohash, result): self.update_torrents_checked(torrent_update_dict) # TODO: DRY! Stop doing lots of formats, just make REST endpoint automatically encode binary data to hex! - self.notifier.notify(NTFY.CHANNEL_ENTITY_UPDATED.value, - {"infohash": hexlify(infohash), - "num_seeders": torrent_update_dict["seeders"], - "num_leechers": torrent_update_dict["leechers"], - "last_tracker_check": torrent_update_dict["last_check"], - "health": "updated"}) + self.notifier[notifications.channel_entity_updated]({"infohash": hexlify(infohash), + "num_seeders": torrent_update_dict["seeders"], + "num_leechers": torrent_update_dict["leechers"], + "last_tracker_check": torrent_update_dict["last_check"], + "health": "updated"}) return final_response @task diff --git a/src/tribler-core/tribler_core/components/tunnel/community/tunnel_community.py b/src/tribler-core/tribler_core/components/tunnel/community/tunnel_community.py index 06840de0a71..1d44f37a180 100644 --- a/src/tribler-core/tribler_core/components/tunnel/community/tunnel_community.py +++ b/src/tribler-core/tribler_core/components/tunnel/community/tunnel_community.py @@ -32,6 +32,7 @@ from ipv8.types import Address from ipv8.util import succeed +from tribler_core import notifications from tribler_core.components.bandwidth_accounting.db.transaction import BandwidthTransactionData from tribler_core.components.socks_servers.socks5.server import Socks5Server from tribler_core.components.tunnel.community.caches import BalanceRequestCache, HTTPRequestCache @@ -52,7 +53,6 @@ DLSTATUS_METADATA, DLSTATUS_SEEDING, DLSTATUS_STOPPED, - NTFY, ) from tribler_core.utilities.unicode import hexlify @@ -384,7 +384,7 @@ def remove_circuit(self, circuit_id, additional_info='', remove_now=False, destr # Send the notification if self.notifier: - self.notifier.notify(NTFY.TUNNEL_REMOVE, circuit, additional_info) + self.notifier[notifications.circuit_removed](circuit, additional_info) # Ignore circuits that are closing so we do not payout again if we receive a destroy message. if circuit.state != CIRCUIT_STATE_CLOSING and self.bandwidth_community: @@ -429,12 +429,12 @@ async def remove_relay(self, circuit_id, additional_info='', remove_now=False, d if self.notifier: for removed_relay in removed_relays: - self.notifier.notify(NTFY.TUNNEL_REMOVE, removed_relay, additional_info) + self.notifier[notifications.circuit_removed](removed_relay, additional_info) def remove_exit_socket(self, circuit_id, additional_info='', remove_now=False, destroy=False): if circuit_id in self.exit_sockets and self.notifier: exit_socket = self.exit_sockets[circuit_id] - self.notifier.notify(NTFY.TUNNEL_REMOVE, exit_socket, additional_info) + self.notifier[notifications.circuit_removed](exit_socket, additional_info) self.clean_from_slots(circuit_id) diff --git a/src/tribler-core/tribler_core/components/tunnel/tests/test_full_session/test_tunnel_community.py b/src/tribler-core/tribler_core/components/tunnel/tests/test_full_session/test_tunnel_community.py index d3875660165..011ffeb445a 100644 --- a/src/tribler-core/tribler_core/components/tunnel/tests/test_full_session/test_tunnel_community.py +++ b/src/tribler-core/tribler_core/components/tunnel/tests/test_full_session/test_tunnel_community.py @@ -7,7 +7,7 @@ from traceback import print_exception from typing import List -from asynctest import Mock +from asynctest import MagicMock from ipv8.messaging.anonymization.tunnel import PEER_FLAG_EXIT_BT from ipv8.peer import Peer @@ -126,9 +126,9 @@ async def create_tunnel_community(temp_path_factory, comm_config: TunnelCommunit dlmgr = DownloadManager(state_dir=temp_path_factory.mktemp('state_dir'), config=dlmgr_settings, - peer_mid=Mock(), + peer_mid=MagicMock(), socks_listen_ports=socks_ports, - notifier=Mock()) + notifier=MagicMock()) dlmgr.metadata_tmpdir = temp_path_factory.mktemp('metadata_tmpdir') comm_config = comm_config or TunnelCommunitySettings() diff --git a/src/tribler-core/tribler_core/components/version_check/tests/test_versioncheck.py b/src/tribler-core/tribler_core/components/version_check/tests/test_versioncheck.py index 806d025c2bb..b815f360838 100644 --- a/src/tribler-core/tribler_core/components/version_check/tests/test_versioncheck.py +++ b/src/tribler-core/tribler_core/components/version_check/tests/test_versioncheck.py @@ -1,6 +1,6 @@ import json from asyncio import sleep -from unittest.mock import Mock +from unittest.mock import MagicMock from aiohttp import web @@ -18,7 +18,7 @@ def make_platform_mock(): - platform_mock = Mock() + platform_mock = MagicMock() platform_mock.machine = lambda: 'Something64' platform_mock.system = lambda: 'OsName' platform_mock.release = lambda: '123' @@ -37,7 +37,7 @@ async def fixture_version_check_manager(free_port): prev_urls = versioncheck_manager.VERSION_CHECK_URLS versioncheck_manager.platform = make_platform_mock() versioncheck_manager.VERSION_CHECK_URLS = [f"http://localhost:{free_port}"] - version_check_manager = VersionCheckManager(notifier=Mock()) + version_check_manager = VersionCheckManager(notifier=MagicMock()) try: yield version_check_manager finally: @@ -183,7 +183,7 @@ async def test_fallback_on_multiple_urls(free_port, version_check_manager, versi def test_useragent_string(): - platform = Mock() + platform = MagicMock() platform.machine = lambda: 'AMD64' platform.system = lambda: 'Windows' platform.release = lambda: '10' diff --git a/src/tribler-core/tribler_core/components/version_check/versioncheck_manager.py b/src/tribler-core/tribler_core/components/version_check/versioncheck_manager.py index e68c7a992f7..268c25fca15 100644 --- a/src/tribler-core/tribler_core/components/version_check/versioncheck_manager.py +++ b/src/tribler-core/tribler_core/components/version_check/versioncheck_manager.py @@ -6,8 +6,8 @@ from ipv8.taskmanager import TaskManager -from tribler_core.notifier import Notifier -from tribler_core.utilities.simpledefs import NTFY +from tribler_core import notifications +from tribler_core.utilities.notifier import Notifier from tribler_core.version import version_id VERSION_CHECK_URLS = [f'https://release.tribler.org/releases/latest?current={version_id}', # Tribler Release API @@ -62,7 +62,7 @@ async def check_new_version_api(self, version_check_url): response_dict = await response.json(content_type=None) version = response_dict['name'][1:] if LooseVersion(version) > LooseVersion(version_id): - self.notifier.notify(NTFY.TRIBLER_NEW_VERSION.value, version) + self.notifier[notifications.tribler_new_version](version) return True return False diff --git a/src/tribler-core/tribler_core/components/watch_folder/tests/test_watch_folder.py b/src/tribler-core/tribler_core/components/watch_folder/tests/test_watch_folder.py index 7c2f9a004ec..4f54f7565b9 100644 --- a/src/tribler-core/tribler_core/components/watch_folder/tests/test_watch_folder.py +++ b/src/tribler-core/tribler_core/components/watch_folder/tests/test_watch_folder.py @@ -1,7 +1,7 @@ import os import shutil -from asynctest import Mock +from asynctest import MagicMock import pytest @@ -11,7 +11,7 @@ @pytest.fixture async def watcher_fixture(tmp_path): - watch = WatchFolder(watch_folder_path=tmp_path, download_manager=Mock(), notifier=Mock()) + watch = WatchFolder(watch_folder_path=tmp_path, download_manager=MagicMock(), notifier=MagicMock()) yield watch await watch.stop() diff --git a/src/tribler-core/tribler_core/components/watch_folder/watch_folder.py b/src/tribler-core/tribler_core/components/watch_folder/watch_folder.py index 890d80da31a..494cb0e8c29 100644 --- a/src/tribler-core/tribler_core/components/watch_folder/watch_folder.py +++ b/src/tribler-core/tribler_core/components/watch_folder/watch_folder.py @@ -4,11 +4,11 @@ from ipv8.taskmanager import TaskManager +from tribler_core import notifications from tribler_core.components.libtorrent.download_manager.download_manager import DownloadManager from tribler_core.components.libtorrent.torrentdef import TorrentDef -from tribler_core.notifier import Notifier from tribler_core.utilities import path_util -from tribler_core.utilities.simpledefs import NTFY +from tribler_core.utilities.notifier import Notifier WATCH_FOLDER_CHECK_INTERVAL = 10 @@ -42,7 +42,7 @@ def cleanup_torrent_file(self, root, name): self._logger.warning(f'Cant rename the file to {path}. Exception: {e}') self._logger.warning("Watch folder - corrupt torrent file %s", name) - self.notifier.notify(NTFY.WATCH_FOLDER_CORRUPT_FILE.value, name) + self.notifier[notifications.watch_folder_corrupt_file](name) def check_watch_folder(self): if not self.watch_folder.is_dir(): diff --git a/src/tribler-core/tribler_core/conftest.py b/src/tribler-core/tribler_core/conftest.py index ae10944d862..2d51b26fcfd 100644 --- a/src/tribler-core/tribler_core/conftest.py +++ b/src/tribler-core/tribler_core/conftest.py @@ -1,7 +1,7 @@ import asyncio import os from pathlib import Path -from unittest.mock import Mock +from unittest.mock import MagicMock from aiohttp import web @@ -80,7 +80,7 @@ def enable_ipv8(tribler_config): @pytest.fixture def mock_dlmgr(state_dir): - dlmgr = Mock() + dlmgr = MagicMock() dlmgr.config = LibtorrentSettings() dlmgr.shutdown = lambda: succeed(None) checkpoints_dir = state_dir / 'dlcheckpoints' @@ -112,7 +112,7 @@ async def video_seeder(tmp_path_factory, video_tdef): dlmgr = DownloadManager( config=config, state_dir=seeder_state_dir, - notifier=Mock(), + notifier=MagicMock(), peer_mid=b"0000") dlmgr.metadata_tmpdir = tmp_path_factory.mktemp('metadata_tmpdir') dlmgr.initialize() @@ -255,7 +255,7 @@ async def test_download(mock_dlmgr, test_tdef): @pytest.fixture def mock_lt_status(): - lt_status = Mock() + lt_status = MagicMock() lt_status.upload_rate = 123 lt_status.download_rate = 43 lt_status.total_upload = 100 @@ -296,7 +296,7 @@ async def download_manager(tmp_path_factory): download_manager = DownloadManager( config=config, state_dir=tmp_path_factory.mktemp('state_dir'), - notifier=Mock(), + notifier=MagicMock(), peer_mid=b"0000") download_manager.metadata_tmpdir = tmp_path_factory.mktemp('metadata_tmpdir') download_manager.initialize() diff --git a/src/tribler-core/tribler_core/notifications.py b/src/tribler-core/tribler_core/notifications.py new file mode 100644 index 00000000000..9b25a74effe --- /dev/null +++ b/src/tribler-core/tribler_core/notifications.py @@ -0,0 +1,89 @@ +from typing import Optional + +from ipv8.messaging.anonymization.tunnel import Circuit + + +# pylint: disable=unused-argument + + +def torrent_finished(infohash: str, name: str, hidden: bool): + # A torrent has finished downloading. Contains the infohash and the name of the torrent + ... + + +def tribler_shutdown_state(state: str): + # Tribler is going to shutdown + ... + + +def tribler_new_version(version: str): + # A new version of Tribler is available + ... + + +def channel_discovered(data: dict): + # Tribler has discovered a new channel. Contains the channel data + ... + + +def remote_query_results(data: dict): + # Remote GigaChannel search results were received by Tribler. Contains received entries + ... + + +def circuit_removed(circuit: Circuit, additional_info: str): + # Tribler tunnel circuit has been removed (notification to Core) + ... + + +def tunnel_removed(circuit_id: int, bytes_up: int, bytes_down: int, uptime: float, additional_info: str = ''): + # Tribler tunnel circuit has been removed (notification to GUI) + ... + + +def watch_folder_corrupt_file(file_name: str): + # A corrupt .torrent file in the watch folder is found. Contains the name of the corrupt torrent file + ... + + +def channel_entity_updated(channel_update_dict: dict): + # Information about some torrent has been updated (e.g. health). Contains updated torrent data + ... + + +def low_space(disk_usage_data: dict): + # Tribler is low on disk space for storing torrents + ... + + +def events_start(public_key: str, version: str): + ... + + +def tribler_exception(error: dict): + ... + + +def popularity_community_unknown_torrent_added(): + ... + + +def report_config_error(error): + # Report config error on startup + ... + + +def peer_disconnected(peer_id: bytes): + ... + + +def tribler_torrent_peer_update(peer_id: bytes, infohash: bytes, balance: int): + ... + + +def torrent_metadata_added(metadata: dict): + ... + + +def new_torrent_metadata_created(infohash: Optional[bytes] = None, title: Optional[str] = None): + ... diff --git a/src/tribler-core/tribler_core/notifier.py b/src/tribler-core/tribler_core/notifier.py deleted file mode 100644 index 3737d869d3a..00000000000 --- a/src/tribler-core/tribler_core/notifier.py +++ /dev/null @@ -1,56 +0,0 @@ -import logging -from asyncio import get_event_loop -from collections import defaultdict -from typing import Callable, Dict - - -class Notifier: - def __init__(self): - self.logger = logging.getLogger(self.__class__.__name__) - - # we use type Dict for `self.observers` for providing the deterministic order of callbacks - # Therefore `value: bool` here is unnecessary, and it just newer use. - self.observers: Dict[str, Dict[Callable, bool]] = defaultdict(dict) - - # @ichorid: - # We have to note the event loop reference, because when we call "notify" from an external thread, - # we don't know anything about the existence of the event loop, and get_event_loop() can't find - # the original event loop from an external thread. - # We remember the event loop from the thread that runs the Notifier - # to be able to schedule notifications from external threads - self._loop = get_event_loop() - - def add_observer(self, topic: str, callback: Callable): - """ Add the observer for the topic. - Order of the added callbacks will be the same order for the calling the callbacks. - `add_observer` doesn't support duplicated callbacks. - """ - self.logger.debug(f"Add observer topic {topic}") - self.observers[topic][callback] = True - - def remove_observer(self, topic: str, callback: Callable): - """ Remove the observer from the topic. In the case of a missed callback no error will be raised. - """ - self.logger.debug(f"Remove observer topic {topic}") - self.observers[topic].pop(callback, None) - - def notify(self, topic: str, *args, **kwargs): - """ Notify all observers about the topic. - - Each call of observer's callback is isolated and an exception that could - occur in this call will not affect all other calls. - """ - try: - def _notify(_callback): - _callback(*args, **kwargs) - - for callback in list(self.observers[topic]): - # @ichorid: - # We have to call the notifier callbacks through call_soon_threadsafe - # because the notify method could have been called from a non-reactor thread - self._loop.call_soon_threadsafe(_notify, callback) - except RuntimeError as e: - # Raises RuntimeError if called on a loop that’s been closed. - # This can happen on a secondary thread when the main application is shutting down. - # https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.call_soon_threadsafe - self.logger.warning(e) diff --git a/src/tribler-core/tribler_core/start_core.py b/src/tribler-core/tribler_core/start_core.py index acdb057d269..32947dc0785 100644 --- a/src/tribler-core/tribler_core/start_core.py +++ b/src/tribler-core/tribler_core/start_core.py @@ -6,6 +6,7 @@ import sys from typing import List +from tribler_core import notifications from tribler_core.check_os import ( check_and_enable_code_tracing, set_process_priority, @@ -36,7 +37,6 @@ from tribler_core.sentry_reporter.sentry_reporter import SentryReporter, SentryStrategy from tribler_core.upgrade.version_manager import VersionHistory from tribler_core.utilities.process_checker import ProcessChecker -from tribler_core.utilities.simpledefs import NTFY logger = logging.getLogger(__name__) CONFIG_FILE_NAME = 'triblerd.conf' @@ -97,13 +97,13 @@ async def core_session(config: TriblerConfig, components: List[Component]): async with session.start() as session: # If there is a config error, report to the user via GUI notifier if config.error: - session.notifier.notify(NTFY.REPORT_CONFIG_ERROR.value, config.error) + session.notifier[notifications.report_config_error](config.error) # SHUTDOWN await session.shutdown_event.wait() if not config.gui_test_mode: - session.notifier.notify(NTFY.TRIBLER_SHUTDOWN_STATE.value, "Saving configuration...") + session.notifier[notifications.tribler_shutdown_state]("Saving configuration...") config.write() diff --git a/src/tribler-core/tribler_core/tests/test_notifier.py b/src/tribler-core/tribler_core/tests/test_notifier.py deleted file mode 100644 index 449249ad839..00000000000 --- a/src/tribler-core/tribler_core/tests/test_notifier.py +++ /dev/null @@ -1,111 +0,0 @@ -import asyncio -from unittest.mock import Mock - -import pytest - -from tribler_core.notifier import Notifier - -# pylint: disable=redefined-outer-name, protected-access - -@pytest.fixture -def notifier(): - return Notifier() - - -class TestCallback: - def __init__(self, side_effect=None): - self.callback_has_been_called = False - self.callback_has_been_called_with_args = None - self.callback_has_been_called_with_kwargs = None - self.side_effect = side_effect - self.event = asyncio.Event() - - def callback(self, *args, **kwargs): - self.callback_has_been_called_with_args = args - self.callback_has_been_called_with_kwargs = kwargs - self.callback_has_been_called = True - if self.side_effect: - raise self.side_effect() - - self.event.set() - - -@pytest.mark.asyncio -async def test_notifier_add_observer(notifier: Notifier): - def callback(): - ... - - # test that add observer stores topics and callbacks as a set to prevent duplicates - notifier.add_observer('topic', callback) - notifier.add_observer('topic', callback) - - assert len(notifier.observers['topic']) == 1 - - -@pytest.mark.asyncio -async def test_notifier_remove_nonexistent_observer(notifier: Notifier): - # test that `remove_observer` don't crash in case of calling to remove non existed topic/callback - notifier.remove_observer('nonexistent', lambda: None) - assert not notifier.observers['nonexistent'] - - -@pytest.mark.asyncio -async def test_notifier_remove_observer(notifier: Notifier): - def callback1(): - ... - - def callback2(): - ... - - notifier.add_observer('topic', callback1) - notifier.add_observer('topic', callback2) - - notifier.remove_observer('topic', callback1) - assert notifier.observers['topic'] == {callback2: True} - - -@pytest.mark.timeout(1) -@pytest.mark.asyncio -async def test_notify(notifier: Notifier): - # test that notify works as expected - normal_callback = TestCallback() - - notifier.add_observer('topic', normal_callback.callback) - notifier.notify('topic', 'arg', kwarg='value') - - # wait for the callback - await normal_callback.event.wait() - assert normal_callback.callback_has_been_called - assert normal_callback.callback_has_been_called_with_args == ('arg',) - assert normal_callback.callback_has_been_called_with_kwargs == {'kwarg': 'value'} - - -@pytest.mark.asyncio -async def test_notify_with_exception(notifier: Notifier): - # test that notify works as expected even if one of callbacks will raise an exception - - normal_callback = TestCallback() - side_effect_callback = TestCallback(ValueError) - - notifier.add_observer('topic', side_effect_callback.callback) - notifier.add_observer('topic', normal_callback.callback) - notifier.add_observer('topic', side_effect_callback.callback) - - notifier.notify('topic') - - # wait - await asyncio.sleep(1) - - assert normal_callback.callback_has_been_called - assert side_effect_callback.callback_has_been_called - - -@pytest.mark.asyncio -async def test_notify_call_soon_threadsafe_with_exception(notifier: Notifier): - notifier.logger = Mock() - notifier._loop = Mock(call_soon_threadsafe=Mock(side_effect=RuntimeError)) - - notifier.add_observer('topic', lambda: ...) - notifier.notify('topic') - - notifier.logger.warning.assert_called_once() diff --git a/src/tribler-core/tribler_core/upgrade/tests/test_upgrader.py b/src/tribler-core/tribler_core/upgrade/tests/test_upgrader.py index ea6bc219455..37a38696781 100644 --- a/src/tribler-core/tribler_core/upgrade/tests/test_upgrader.py +++ b/src/tribler-core/tribler_core/upgrade/tests/test_upgrader.py @@ -12,7 +12,6 @@ from tribler_core.components.metadata_store.db.orm_bindings.channel_metadata import CHANNEL_DIR_NAME_LENGTH from tribler_core.components.metadata_store.db.store import CURRENT_DB_VERSION, MetadataStore from tribler_core.components.tag.db.tag_db import TagDatabase -from tribler_core.notifier import Notifier from tribler_core.tests.tools.common import TESTS_DATA_DIR from tribler_core.upgrade.db8_to_db10 import calc_progress from tribler_core.upgrade.upgrade import TriblerUpgrader, cleanup_noncompliant_channel_torrents @@ -43,11 +42,6 @@ def upgrader(state_dir, channels_dir, trustchain_keypair): return TriblerUpgrader(state_dir, channels_dir, trustchain_keypair) -@pytest.fixture -def notifier(): - return Notifier() - - @pytest.fixture def mds_path(state_dir): return state_dir / 'sqlite/metadata.db' diff --git a/src/tribler-core/tribler_core/utilities/notifier.py b/src/tribler-core/tribler_core/utilities/notifier.py new file mode 100644 index 00000000000..8c9022fbd62 --- /dev/null +++ b/src/tribler-core/tribler_core/utilities/notifier.py @@ -0,0 +1,252 @@ +import inspect +import logging +from asyncio import AbstractEventLoop +from collections import defaultdict +from threading import Lock +from typing import Any, Callable, Dict, List, Optional, Tuple, TypeVar, cast + + +FuncT = TypeVar("FuncT", bound=Callable[..., Any]) + + +class NotifierError(Exception): + pass + + +class Notifier: + """ + Allows communication between different Tribler modules and components. + + With Notifier, you can subscribe observer to a topic and receive notifications. The topic is a function, + and the observer should be a callable with the same signature. Notifier is statically typed - if an observer + has an incorrect signature or notification is called with wrong arguments, you should get a TypeError. + PyCharm also should highlight incorrect observer registration and incorrect topic invocation. + + An example of usage: + + First, you need to create a Notifier instance. You can pass an event loop if the notifier should be able + to process notifications asynchronously. + + >>> import asyncio + >>> notifier = Notifier(loop=asyncio.get_event_loop()) + + A topic is a function with an arbitrary signature. Usually, it has an empty body (a pass statement) but can include + a debug code as well. It is called when notification is sent to observers. + + >>> def topic(foo: int, bar: str): + ... print("Notification is sent!") + ... + + An observer should have the same signature as the topic (the return type is ignored for convenience). It may be a + bound method of an object, in that case the `self` argument is also ignored. + + >>> def observer(foo: int, bar: str): + ... print("Observer called with", foo, bar) + ... + >>> def second_observer(foo: int, bar: str): + ... print("Second observer called with", foo, bar) + ... + + To connect an observer to a specific notification, you can use the `add_observer` method. The method checks that + the topic and the observer have the same signature. + + >>> notifier.add_observer(topic, observer) + + Observers can be registered as synchronous or asynchronous. Synchronous observers are called immediately, + and asynchronous observers are called in the subsequent event loop iterations. By default, the observer + is asynchronous if the notifier was initialized with an event loop. You can explicitly specify if the observer + is synchronous or not: + + >>> notifier.add_observer(topic, second_observer, synchronous=True) + + To call observers for a specific topic in a type-safe manner, use square brackets syntax. If you are not aware + what arguments should be used for specific topic, in IDE you can click on the topic function name and jump to the + function signature. + + >>> notifier[topic](123, "abc") + >>> notifier[topic](foo=123, bar="abc") + + When you invoke a notifier, all observers for the topic receive notification in the order as they were registered + (synchronous observers first, then asynchronous). + + As an alternative, you can use the `notify` method, but without static type checks: + + >>> notifier.notify(topic, foo=123, bar="abc") + + The last way to invoke notification is by a topic function name. It can be useful when writing generic code. + To be able to call the topic in this manner, it should have at least one observer: + + >>> notifier.notify_by_topic_name("topic", foo=123, bar="abc") + + You can also register a generic observer, receiving notifications for any topic. It will receive the topic + as a first argument. When notification is called, generic observers are called before topic-specific observers + in the same order as they were registered: + + >>> def generic_observer(topic, *args, **kwargs): + ... print("Generic observer called for", topic.__name__, "with", args, kwargs) + ... + >>> notifier.add_generic_observer(generic_observer) + + You can remove an observer or generic observer by calling the corresponding method: + + >>> notifier.remove_observer(observer) + >>> notifier.remove_generic_observer(generic_observer) + + In Tribler, both Core and GUI have notifiers. Tribler uses generic observer to retranslate a subset of topics + from Core to GUI. Core notifier is attached to the event loop and processes most topics asynchronously. + GUI does not have an event loop, so GUI notifier processes retranslated topics synchronously. Basically, GUI + notifier fires corresponding Qt signal for each topic. + + EventsEndpoint in Core and EventsRequestManager in GUI implement this logic of retranslation. EventsEndpoint adds + a generic observer that listens to all topics, serializes a subset of notification calls to JSON, and sends them + to GUI. EventRequestManager receives messages, deserializes arguments and calls `notifier.notify_by_topic_name`. + """ + + def __init__(self, loop: AbstractEventLoop = None): + self.lock = Lock() + self.logger = logging.getLogger(self.__class__.__name__) + + self.topics_by_name: Dict[str, Callable] = {} + # We use the dict type for `self.observers` and `set.generic_observers` instead of the set type to provide + # the deterministic ordering of callbacks. In Python, dictionaries are ordered while sets aren't. + # Therefore, `value: bool` here is unnecessary and is never used. + self.topics: Dict[Callable, Dict[Callable, bool]] = defaultdict(dict) + self.generic_observers: Dict[Callable, bool] = {} + self.interceptors: Dict[Callable, bool] = {} + + # @ichorid: + # We have to store the event loop in constructor. Otherwise, get_event_loop() cannot find + # the original event loop when scheduling notifications from the external thread. + self.loop = loop + + def add_observer(self, topic: FuncT, observer: FuncT, synchronous: Optional[bool] = None): + """ Add the observer for the topic. + Each callback will be added no more than once. Callbacks are called in the same order as they were added. + + topic: + A callable which represents a "topic" to subscribe + + observer: + A callable which will be actually called when notification is sent to the topic + + synchronous: + A strategy of how to call the observer. If True, + + + """ + synchronous = self._check_synchronous(synchronous) + empty = inspect._empty # pylint: disable=protected-access + # ignore types of return values, as during the notification call the return values are ignored + topic_signature = inspect.signature(topic).replace(return_annotation=empty) + callback_signature = inspect.signature(observer).replace(return_annotation=empty) + if topic_signature != callback_signature: + raise TypeError(f'Cannot add observer {observer!r} to topic "{topic.__name__}": ' + f'the callback signature {callback_signature} does not match ' + f'the topic signature {topic_signature}') + + if inspect.iscoroutinefunction(topic): + raise TypeError(f"Topic cannot be a coroutine function. Got: {topic!r}") + + if inspect.iscoroutinefunction(observer): + raise TypeError(f"Observer cannot be a coroutine function. Got: {observer!r}") + + if topic is observer: + raise TypeError(f"Topic and observer cannot be the same function. Got: {topic!r}") + + self.logger.debug(f"Add observer topic {topic.__name__}") + with self.lock: + topic_name: str = topic.__name__ + prev_topic = self.topics_by_name.setdefault(topic_name, topic) + if prev_topic is not topic: + raise NotifierError(f'Cannot register topic {topic!r} because topic name {topic_name} is already taken ' + f'by another topic {prev_topic!r}') + prev_synchronous = self.topics[topic].setdefault(observer, synchronous) + if prev_synchronous != synchronous: + raise NotifierError('Cannot register the same observer with a different value of `synchronous` option') + + def _check_synchronous(self, synchronous: Optional[bool]) -> bool: + if not any(synchronous is option for option in (True, False, None)): + raise TypeError(f"`synchronous` option may be True, False or None. Got: {synchronous!r}") + + if synchronous is False and self.loop is None: + raise TypeError("synchronous=False option cannot be specified for a notifier without an event loop") + + if synchronous is None: + synchronous = not self.loop + + return synchronous + + def remove_observer(self, topic: FuncT, observer: FuncT): + """ Remove the observer from the topic. In the case of a missed callback no error will be raised. + """ + with self.lock: + observers = self.topics[topic] + observers.pop(observer, None) + comment = "" if not observers else f" (it still has {len(observers)} observers)" + self.logger.debug(f"Remove observer {observer!r} from topic {topic.__name__}" + comment) + + def add_generic_observer(self, observer: Callable, synchronous: Optional[bool] = None): + self.logger.debug(f"Add generic observer {observer!r}") + with self.lock: + self.generic_observers[observer] = self._check_synchronous(synchronous) + + def remove_generic_observer(self, observer: Callable): + with self.lock: + self.generic_observers.pop(observer, None) + self.logger.debug(f"Remove generic observer {observer!r}") + + def __getitem__(self, topic: FuncT) -> FuncT: + def wrapper(*args, **kwargs): + self.notify(topic, *args, **kwargs) + return cast(FuncT, wrapper) + + def notify_by_topic_name(self, topic_name: str, *args, **kwargs): + with self.lock: + topic = self.topics_by_name.get(topic_name) + if topic is None: + self.logger.warning(f'Topic with name `{topic_name}` not found') + else: + self.notify(topic, *args, **kwargs) + + def notify(self, topic: Callable, *args, **kwargs): + """ Notify all observers about the topic. + + Сan be called from any thread. Observers will be called from the reactor thread during the next iteration + of the event loop. An exception when an observer is invoked will not affect other observers. + """ + self.logger.debug(f"Notification for topic {topic.__name__}") + + topic(*args, **kwargs) + + with self.lock: + generic_observers: List[Tuple[Callable, bool]] = list(self.generic_observers.items()) + observers: List[Tuple[Callable, bool]] = list(self.topics[topic].items()) + + generic_observer_args = (topic,) + args + for observer, synchronous in generic_observers: + if synchronous: + self._notify(topic, observer, generic_observer_args, kwargs) + else: + self._notify_threadsafe(topic, observer, generic_observer_args, kwargs) + + for observer, synchronous in observers: + if synchronous: + self._notify(topic, observer, args, kwargs) + else: + self._notify_threadsafe(topic, observer, args, kwargs) + + def _notify_threadsafe(self, topic: Callable, observer: Callable, args: Tuple, kwargs: Dict[str, Any]): + try: + self.loop.call_soon_threadsafe(self._notify, topic, observer, args, kwargs) + except RuntimeError as e: + # Raises RuntimeError if called on a loop that’s been closed. + # This can happen on a secondary thread when the main application is shutting down. + # https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.call_soon_threadsafe + self.logger.warning(e) + + def _notify(self, topic: Callable, observer: Callable, args: tuple, kwargs: dict): + self.logger.debug(f"Calling observer {observer!r} for topic {topic.__name__}") + try: + observer(*args, **kwargs) + except Exception as e: # pylint: disable=broad-except + self.logger.exception(e) diff --git a/src/tribler-core/tribler_core/utilities/tests/test_notifier.py b/src/tribler-core/tribler_core/utilities/tests/test_notifier.py new file mode 100644 index 00000000000..cacb2f076a6 --- /dev/null +++ b/src/tribler-core/tribler_core/utilities/tests/test_notifier.py @@ -0,0 +1,381 @@ +import asyncio +from unittest.mock import MagicMock + +import pytest + +from tribler_core.utilities.notifier import Notifier, NotifierError + + +# pylint: disable=unused-argument + + +def test_add_remove_observer(): + notifier = Notifier() + + # A topic should be callable + with pytest.raises(TypeError, match=r"^'topic' is not a callable object$"): + notifier.add_observer('topic', lambda x: x) + + def topic1(x: int): + pass + + # An observer should be callable as well + with pytest.raises(TypeError, match=r"^'observer' is not a callable object$"): + notifier.add_observer(topic1, "observer") + + def observer1(): + pass + + # Topic and observer functions should have the same number of arguments + with pytest.raises(TypeError, match=r'^Cannot add observer to topic "topic1": ' + r'the callback signature \(\) does not match the topic signature \(x: int\)$'): + notifier.add_observer(topic1, observer1) + + def observer2(x): + pass + + # Topic and observer functions should have the same argument types + with pytest.raises(TypeError, match=r'^Cannot add observer to topic "topic1": ' + r'the callback signature \(x\) does not match the topic signature \(x: int\)$'): + notifier.add_observer(topic1, observer2) + + def observer3(x: str): + pass + + # Topic and observer functions should have the same argument types + with pytest.raises(TypeError, match=r'^Cannot add observer to topic "topic1": ' + r'the callback signature \(x: str\) ' + r'does not match the topic signature \(x: int\)$'): + notifier.add_observer(topic1, observer3) + + def observer4(y: int): + pass + + # Topic and observer functions should have the same argument names + with pytest.raises(TypeError, match=r'^Cannot add observer to topic "topic1": ' + r'the callback signature \(y: int\) ' + r'does not match the topic signature \(x: int\)$'): + notifier.add_observer(topic1, observer4) + + def observer5(x: int, y: int): + pass + + # Topic and observer functions should have the same number of arguments + with pytest.raises(TypeError, match=r'^Cannot add observer to topic "topic1": ' + r'the callback signature \(x: int, y: int\) ' + r'does not match the topic signature \(x: int\)$'): + notifier.add_observer(topic1, observer5) + + def observer6(x: int = None): + pass + + # Topic and observer functions should have the same argument defaults + with pytest.raises(TypeError, match=r'^Cannot add observer to topic "topic1": ' + r'the callback signature \(x: int = None\) ' + r'does not match the topic signature \(x: int\)$'): + notifier.add_observer(topic1, observer6) + + async def async1(x: int): + pass + + # Topic and observer cannot be async functions + with pytest.raises(TypeError, match=r'^Topic cannot be a coroutine function. Got: $'): + notifier.add_observer(async1, topic1) + + with pytest.raises(TypeError, match=r'^Observer cannot be a coroutine function. Got: $'): + notifier.add_observer(topic1, async1) + + with pytest.raises(TypeError, match=r'^Topic and observer cannot be the same function. Got: $'): + notifier.add_observer(topic1, topic1) + + + def observer7(x: int): + pass + + with pytest.raises(TypeError, match=r"^`synchronous` option may be True, False or None. Got: 1$"): + notifier.add_observer(topic1, observer7, synchronous=1) + + with pytest.raises(TypeError, match=r"^synchronous=False option cannot be specified " + r"for a notifier without an event loop$"): + notifier.add_observer(topic1, observer7, synchronous=False) + + assert not notifier.topics_by_name + assert not notifier.topics + assert not notifier.generic_observers + assert not notifier.interceptors + + # add first observer to topic1 + notifier.add_observer(topic1, observer7) + + assert notifier.topics_by_name == {'topic1': topic1} + assert notifier.topics == {topic1: {observer7: True}} + + # adding the same observer the second time should change nothing + notifier.add_observer(topic1, observer7) + assert notifier.topics == {topic1: {observer7: True}} + + def observer8(x: int): + pass + + # add second observer to topic1 + notifier.add_observer(topic1, observer8) + assert notifier.topics == {topic1: {observer7: True, observer8: True}} + + # generic observers and interceptors were not added + assert not notifier.generic_observers + assert not notifier.interceptors + + def topic2(x: int): + pass + + def observer9(x: int): + pass + + # no exception when removing an observer from non-registered topic + notifier.remove_observer(topic2, observer7) + + # no exception when removing a non-registered observer + notifier.remove_observer(topic1, observer9) + # we still has two observers for topic1 topic + assert notifier.topics == {topic1: {observer7: True, observer8: True}, topic2: {}} + + # remove the first observer from the topic1 topic + notifier.remove_observer(topic1, observer7) + assert notifier.topics == {topic1: {observer8: True}, topic2: {}} + + # remove last observer from the topic1 topic + notifier.remove_observer(topic1, observer8) + assert notifier.topics == {topic1: {}, topic2: {}} + + +def test_two_topics_with_the_same_name(): + notifier = Notifier() + + def topic1(x: int): + pass + + def observer1(x: int): + pass + + notifier.add_observer(topic1, observer1) + + def topic1(x: int): # pylint: disable=function-redefined # try to define another topic with the same name + pass + + def observer2(x: int): + pass + + with pytest.raises(NotifierError, match='^Cannot register topic <.*topic1.*> because topic name topic1 ' + 'is already taken by another topic <.*topic1.*>$'): + notifier.add_observer(topic1, observer2) + + +def test_notify(): + def topic_a(a: int, b: str): + pass + + def topic_b(x: int): + pass + + calls = [] + + def observer_a1(a: int, b: str): + calls.append(('a1', a, b)) + + def observer_a2(a: int, b: str): + calls.append(('a2', a, b)) + + def observer_b1(x: int): + calls.append(('b1', x)) + + def generic_1(*args, **kwargs): + calls.append((('generic1',) + args + (repr(kwargs),))) + + def generic_2(*args, **kwargs): + calls.append((('generic2',) + args + (repr(kwargs),))) + + notifier = Notifier() + notifier.add_observer(topic_a, observer_a1) # add an observer + notifier.add_observer(topic_a, observer_a1) # adding the same observer multiple times should affect nothing + notifier.add_generic_observer(generic_1) # add a generic observer + + with pytest.raises(TypeError): + notifier[topic_a](123) + + assert calls == [] + + notifier[topic_a](1, 'aaa') + + assert calls == [('generic1', topic_a, 1, 'aaa', '{}'), ('a1', 1, 'aaa')] + calls.clear() + + notifier.add_observer(topic_a, observer_a2) # add a second observer to the same topic + notifier.add_observer(topic_b, observer_b1) # observer to a different topic + notifier.add_generic_observer(generic_2) # a second generic observer + + notifier[topic_a](2, 'bbb') + + assert calls == [('generic1', topic_a, 2, 'bbb', '{}'), ('generic2', topic_a, 2, 'bbb', '{}'), ('a1', 2, 'bbb'), + ('a2', 2, 'bbb')] + calls.clear() + + notifier[topic_b](x=111) + + assert calls == [('generic1', topic_b, "{'x': 111}"), ('generic2', topic_b, "{'x': 111}"), ('b1', 111)] + calls.clear() + + notifier.logger.warning = MagicMock() + notifier.notify_by_topic_name('non_existent_topic', x=1, y=2) + notifier.logger.warning.assert_called_once_with('Topic with name `non_existent_topic` not found') + + notifier.notify_by_topic_name('topic_b', x=111) + + assert calls == [('generic1', topic_b, "{'x': 111}"), ('generic2', topic_b, "{'x': 111}"), ('b1', 111)] + calls.clear() + + notifier.remove_observer(topic_b, observer_b1) + notifier.remove_generic_observer(generic_1) + + notifier[topic_b](222) + + assert calls == [('generic2', topic_b, 222, '{}')] + + +async def test_notify_async(loop): + def topic_a(a: int, b: str): + pass + + def topic_b(x: int): + pass + + calls = [] + + def observer_a1(a: int, b: str): + calls.append(('a1', a, b)) + + def observer_a2(a: int, b: str): + calls.append(('a2', a, b)) + + def observer_b1(x: int): + calls.append(('b1', x)) + + def generic_1(*args, **kwargs): + calls.append((('generic1',) + args + (repr(kwargs),))) + + def generic_2(*args, **kwargs): + calls.append((('generic2',) + args + (repr(kwargs),))) + + notifier = Notifier(loop=loop) + notifier.add_observer(topic_a, observer_a1) # add an observer + notifier.add_observer(topic_a, observer_a1) # adding the same observer multiple times should affect nothing + notifier.add_generic_observer(generic_1) # add a generic observer + + # An attempt to add the same observer with different `synchronous` option value should raise an error + with pytest.raises(NotifierError, match=r'^Cannot register the same observer ' + r'with a different value of `synchronous` option$'): + notifier.add_observer(topic_a, observer_a1, synchronous=True) + + with pytest.raises(TypeError): + notifier[topic_a](123) + + assert calls == [] + + notifier[topic_a](1, 'aaa') + + await asyncio.sleep(0.1) + + assert set(calls) == {('generic1', topic_a, 1, 'aaa', '{}'), ('a1', 1, 'aaa')} + calls.clear() + + notifier.add_observer(topic_a, observer_a2) # add a second observer to the same topic + notifier.add_observer(topic_b, observer_b1) # observer to a different topic + notifier.add_generic_observer(generic_2) # a second generic observer + + notifier[topic_a](2, 'bbb') + + await asyncio.sleep(0.1) + + assert set(calls) == {('generic1', topic_a, 2, 'bbb', '{}'), ('generic2', topic_a, 2, 'bbb', '{}'), + ('a1', 2, 'bbb'), ('a2', 2, 'bbb')} + calls.clear() + + notifier[topic_b](x=111) + + await asyncio.sleep(0.1) + + assert set(calls) == {('generic1', topic_b, "{'x': 111}"), ('generic2', topic_b, "{'x': 111}"), ('b1', 111)} + calls.clear() + + notifier.remove_observer(topic_b, observer_b1) + notifier.remove_generic_observer(generic_1) + + notifier[topic_b](222) + + await asyncio.sleep(0.1) + + assert set(calls) == {('generic2', topic_b, 222, '{}')} + + +async def test_notify_with_exception(loop): + # test that notify works as expected even if one of callbacks will raise an exception + + def topic(x: int): + pass + + calls = [] + + def observer1(x: int): + calls.append(('observer1', x)) + + def observer2(x: int): + calls.append(('observer2', x)) + raise ZeroDivisionError + + def observer3(x: int): + calls.append(('observer3', x)) + + notifier = Notifier() # First, let's create a notifier without a loop specified + + notifier.add_observer(topic, observer1) + notifier.add_observer(topic, observer2) + notifier.add_observer(topic, observer3) + + notifier[topic](123) + + # when notifier is created without specifying a loop, it processes notifications synchronously + assert calls == [('observer1', 123), ('observer2', 123), ('observer3', 123)] + calls.clear() + + notifier = Notifier(loop=loop) # Now, let's create a notifier tied to a loop + + notifier.add_observer(topic, observer1) + notifier.add_observer(topic, observer2) + notifier.add_observer(topic, observer3) + + notifier[topic](123) + + # when notifier tied to a loop, it processes notifications asynchronously + assert calls == [] + + await asyncio.sleep(0.1) + + # now notifications should be processed + assert set(calls) == {('observer1', 123), ('observer2', 123), ('observer3', 123)} + + +def test_notify_call_soon_threadsafe_with_exception(loop): + notifier = Notifier(loop=loop) + + notifier.logger = MagicMock() + notifier.loop = MagicMock(call_soon_threadsafe=MagicMock(side_effect=RuntimeError)) + + def topic1(x: int): + pass + + def observer1(x: int): + pass + + notifier.add_observer(topic1, observer1) + notifier[topic1](123) + + notifier.logger.warning.assert_called_once() diff --git a/src/tribler-gui/tribler_gui/event_request_manager.py b/src/tribler-gui/tribler_gui/event_request_manager.py index a86f0d16e55..45cee811ead 100644 --- a/src/tribler-gui/tribler_gui/event_request_manager.py +++ b/src/tribler-gui/tribler_gui/event_request_manager.py @@ -5,8 +5,9 @@ from PyQt5.QtCore import QTimer, QUrl, pyqtSignal from PyQt5.QtNetwork import QNetworkAccessManager, QNetworkReply, QNetworkRequest +from tribler_core import notifications from tribler_core.components.reporter.reported_error import ReportedError -from tribler_core.utilities.simpledefs import NTFY +from tribler_core.utilities.notifier import Notifier from tribler_gui import gui_sentry_reporter from tribler_gui.exceptions import CoreConnectTimeoutError, CoreConnectionError @@ -48,32 +49,58 @@ def __init__(self, api_port, api_key, error_handler): self._logger = logging.getLogger(self.__class__.__name__) # This flag is used to prevent race condition when starting GUI tests self.tribler_started_flag = False - self.reactions_dict = { - NTFY.CHANNEL_ENTITY_UPDATED.value: self.node_info_updated.emit, - NTFY.TRIBLER_NEW_VERSION.value: lambda data: self.new_version_available.emit(data["version"]), - NTFY.CHANNEL_DISCOVERED.value: self.discovered_channel.emit, - NTFY.TORRENT_FINISHED.value: self.torrent_finished.emit, - NTFY.LOW_SPACE.value: self.low_storage_signal.emit, - NTFY.REMOTE_QUERY_RESULTS.value: self.received_remote_query_results.emit, - NTFY.TRIBLER_SHUTDOWN_STATE.value: self.tribler_shutdown_signal.emit, - NTFY.EVENTS_START.value: self.events_start_received, - NTFY.REPORT_CONFIG_ERROR.value: self.config_error_signal.emit, - NTFY.TRIBLER_EXCEPTION.value: lambda data: self.error_handler.core_error(ReportedError(**data)), - } self.connect_timer.setSingleShot(True) connect(self.connect_timer.timeout, self.connect) - def events_start_received(self, event_dict): - if event_dict["version"]: + self.notifier = notifier = Notifier() + notifier.add_observer(notifications.events_start, self.on_events_start) + notifier.add_observer(notifications.tribler_exception, self.on_tribler_exception) + notifier.add_observer(notifications.channel_entity_updated, self.on_channel_entity_updated) + notifier.add_observer(notifications.tribler_new_version, self.on_tribler_new_version) + notifier.add_observer(notifications.channel_discovered, self.on_channel_discovered) + notifier.add_observer(notifications.torrent_finished, self.on_torrent_finished) + notifier.add_observer(notifications.low_space, self.on_low_space) + notifier.add_observer(notifications.remote_query_results, self.on_remote_query_results) + notifier.add_observer(notifications.tribler_shutdown_state, self.on_tribler_shutdown_state) + notifier.add_observer(notifications.report_config_error, self.on_report_config_error) + + def on_events_start(self, public_key: str, version: str): + if version: self.tribler_started_flag = True - self.tribler_started.emit(event_dict["version"]) + self.tribler_started.emit(version) # if public key format will be changed, don't forget to change it # at the core side as well - public_key = event_dict["public_key"] if public_key: gui_sentry_reporter.set_user(public_key.encode('utf-8')) + def on_tribler_exception(self, error: dict): + self.error_handler.core_error(ReportedError(**error)) + + def on_channel_entity_updated(self, channel_update_dict: dict): + self.node_info_updated.emit(channel_update_dict) + + def on_tribler_new_version(self, version: str): + self.new_version_available.emit(version) + + def on_channel_discovered(self, data: dict): + self.discovered_channel.emit(data) + + def on_torrent_finished(self, infohash: str, name: str, hidden: bool): + self.torrent_finished.emit(dict(infohash=infohash, name=name, hidden=hidden)) + + def on_low_space(self, disk_usage_data: dict): + self.low_storage_signal.emit(disk_usage_data) + + def on_remote_query_results(self, data: dict): + self.received_remote_query_results.emit(data) + + def on_tribler_shutdown_state(self,state: str): + self.tribler_shutdown_signal.emit(state) + + def on_report_config_error(self, error): + self.config_error_signal.emit(error) + def on_error(self, error, reschedule_on_err): if error == QNetworkReply.ConnectionRefusedError: self._logger.debug("Tribler Core refused connection, retrying...") @@ -109,13 +136,11 @@ def on_read_data(self): if len(received_events) > 100: # Only buffer the last 100 events received_events.pop() - event_type, event = json_dict.get("type"), json_dict.get("event") - reaction = self.reactions_dict.get(event_type) - if reaction: - if event: - reaction(event) - else: - reaction() + topic_name = json_dict.get("topic", "noname") + args = json_dict.get("args", []) + kwargs = json_dict.get("kwargs", {}) + self.notifier.notify_by_topic_name(topic_name, *args, **kwargs) + self.current_event_string = "" def on_finished(self): diff --git a/src/tribler-gui/tribler_gui/tribler_window.py b/src/tribler-gui/tribler_gui/tribler_window.py index d6904d9d1b8..db344f22f40 100644 --- a/src/tribler-gui/tribler_gui/tribler_window.py +++ b/src/tribler-gui/tribler_gui/tribler_window.py @@ -419,7 +419,7 @@ def delete_tray_icon(self): logging.debug("Tray icon already removed, no further deletion necessary.") self.tray_icon = None - def on_low_storage(self, _): + def on_low_storage(self, disk_usage_data): """ Dealing with low storage space available. First stop the downloads and the core manager and ask user to user to make free space. From 3565b57cb88f8a246e4e5348611cbbd9aecb0178 Mon Sep 17 00:00:00 2001 From: Alexander Kozlovsky Date: Mon, 7 Feb 2022 14:16:35 +0100 Subject: [PATCH 2/2] Make processing of `new_torrent_metadata_created` notification synchronous --- .../tribler_core/components/tag/rules/tag_rules_processor.py | 3 ++- .../components/tag/rules/tests/test_tag_rules_processor.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/tribler-core/tribler_core/components/tag/rules/tag_rules_processor.py b/src/tribler-core/tribler_core/components/tag/rules/tag_rules_processor.py index c40f8295936..8e7166e3a40 100644 --- a/src/tribler-core/tribler_core/components/tag/rules/tag_rules_processor.py +++ b/src/tribler-core/tribler_core/components/tag/rules/tag_rules_processor.py @@ -36,7 +36,8 @@ def __init__(self, notifier: Notifier, db: TagDatabase, mds: MetadataStore, self.mds = mds self.batch_size = batch_size self.interval = interval - self.notifier.add_observer(notifications.new_torrent_metadata_created, self.process_torrent_title) + self.notifier.add_observer(notifications.new_torrent_metadata_created, self.process_torrent_title, + synchronous=True) @db_session def start(self): diff --git a/src/tribler-core/tribler_core/components/tag/rules/tests/test_tag_rules_processor.py b/src/tribler-core/tribler_core/components/tag/rules/tests/test_tag_rules_processor.py index 14ce6b140ca..02d082466a7 100644 --- a/src/tribler-core/tribler_core/components/tag/rules/tests/test_tag_rules_processor.py +++ b/src/tribler-core/tribler_core/components/tag/rules/tests/test_tag_rules_processor.py @@ -23,7 +23,8 @@ def test_constructor(tag_rules_processor: TagRulesProcessor): assert tag_rules_processor.interval == TEST_INTERVAL m: MagicMock = tag_rules_processor.notifier.add_observer - m.assert_called_with(notifications.new_torrent_metadata_created, tag_rules_processor.process_torrent_title) + m.assert_called_with(notifications.new_torrent_metadata_created, tag_rules_processor.process_torrent_title, + synchronous=True) @patch.object(TagRulesProcessor, 'save_tags')