diff --git a/src/tribler/core/components/tunnel/tests/test_full_session/test_tunnel_community.py b/src/tribler/core/components/tunnel/tests/test_full_session/test_tunnel_community.py index 7e285abd8f4..727fbcda841 100644 --- a/src/tribler/core/components/tunnel/tests/test_full_session/test_tunnel_community.py +++ b/src/tribler/core/components/tunnel/tests/test_full_session/test_tunnel_community.py @@ -1,14 +1,15 @@ +import asyncio import logging import sys import time -from asyncio import Future, all_tasks, get_event_loop, sleep +from asyncio import all_tasks, get_event_loop, sleep from collections import defaultdict from itertools import permutations -from traceback import print_exception -from typing import List +from typing import List, Optional, Tuple +import pytest +from _pytest.tmpdir import TempPathFactory from asynctest import MagicMock - from ipv8.messaging.anonymization.tunnel import PEER_FLAG_EXIT_BT from ipv8.peer import Peer from ipv8.test.messaging.anonymization import test_community @@ -16,10 +17,9 @@ from ipv8.test.mocking.exit_socket import MockTunnelExitSocket from ipv8.test.mocking.ipv8 import MockIPv8 -import pytest - # Pylint does not agree with the way pytest handles fixtures. # pylint: disable=W0613,W0621 +from tribler.core.components.libtorrent.download_manager.download import Download from tribler.core.components.libtorrent.download_manager.download_config import DownloadConfig from tribler.core.components.libtorrent.download_manager.download_manager import DownloadManager from tribler.core.components.libtorrent.settings import LibtorrentSettings @@ -31,77 +31,76 @@ from tribler.core.tests.tools.common import TESTS_DATA_DIR from tribler.core.utilities.simpledefs import DLSTATUS_DOWNLOADING, DLSTATUS_SEEDING, dlstatus_strings +logger = logging.getLogger("TunnelTests") + +@pytest.fixture def crash_on_error(): - def exception_handler(loop, context): - exc = context.get('exception') - print_exception(type(exc), exc, exc.__traceback__) + def exception_handler(_, context): + logger.exception(context.get('exception')) sys.exit(-1) get_event_loop().set_exception_handler(exception_handler) class ProxyFactory: - def __init__(self, temp_path_factory): - self.comms = [] + self.communities = [] self.temp_path_factory = temp_path_factory - async def get(self, exitnode=False, start_lt=False): - tunn_comm_config = TunnelCommunitySettings() - comm = await create_tunnel_community(self.temp_path_factory, tunn_comm_config, exitnode=exitnode, - start_lt=start_lt) - self.comms.append(comm) - return comm + async def get(self, exitnode=False, start_lt=False) -> TriblerTunnelCommunity: + tunnel_community_config = TunnelCommunitySettings() + community = await create_tunnel_community(self.temp_path_factory, tunnel_community_config, + exit_node_enable=exitnode, + start_lt=start_lt) + self.communities.append(community) + return community @pytest.fixture -async def logger(): - return logging.getLogger("TunnelTests") - - -@pytest.fixture -async def proxy_factory(tmp_path_factory): +async def proxy_factory(tmp_path_factory: TempPathFactory): factory = ProxyFactory(tmp_path_factory) yield factory - for comm in factory.comms: - if comm.dlmgr: - await comm.dlmgr.shutdown() - await comm.unload() + for community in factory.communities: + if community.dlmgr: + await community.dlmgr.shutdown() + await community.unload() test_community.global_dht_services = defaultdict(list) # Reset the global_dht_services variable @pytest.fixture -async def hidden_seeder_comm(proxy_factory, video_tdef): +async def hidden_seeder_comm(proxy_factory: ProxyFactory, video_tdef: TorrentDef) -> TriblerTunnelCommunity: # Also load the tunnel community in the seeder session - comm = await proxy_factory.get(start_lt=True) - comm.build_tunnels(1) + community = await proxy_factory.get(start_lt=True) + community.build_tunnels(1) - dscfg_seed = DownloadConfig() - dscfg_seed.set_dest_dir(TESTS_DATA_DIR) - dscfg_seed.set_hops(1) - upload = comm.dlmgr.start_download(tdef=video_tdef, config=dscfg_seed) + download_config = DownloadConfig() + download_config.set_dest_dir(TESTS_DATA_DIR) + download_config.set_hops(1) + upload = community.dlmgr.start_download(tdef=video_tdef, config=download_config) - def seeder_state_callback(ds): + def seeder_state_callback(download_state): """ The callback of the seeder download. For now, this only logs the state of the download that's seeder and is useful for debugging purposes. """ - comm.monitor_downloads([ds]) - d = ds.get_download() - print(f"seeder: {repr(d.get_def().get_name())} {dlstatus_strings[ds.get_status()]} {ds.get_progress()}") + community.monitor_downloads([download_state]) + download = download_state.get_download() + status = dlstatus_strings[download_state.get_status()] + logger.info(f"seeder: {repr(download.get_def().get_name())} {status} {download_state.get_progress()}") return 2 upload.set_state_callback(seeder_state_callback) await upload.wait_for_status(DLSTATUS_SEEDING) - return comm + return community -async def create_tunnel_community(temp_path_factory, comm_config: TunnelCommunitySettings = None, - exitnode=False, - start_lt=False) -> TriblerTunnelCommunity: +async def create_tunnel_community(temp_path_factory: TempPathFactory, + config: Optional[TunnelCommunitySettings] = None, + exit_node_enable: bool = False, + start_lt: bool = False) -> TriblerTunnelCommunity: """ Load the tunnel community in a given session. We are using our own tunnel community here instead of the one used in Tribler. @@ -116,68 +115,68 @@ async def create_tunnel_community(temp_path_factory, comm_config: TunnelCommunit await socks_server.start() socks_ports.append(socks_server.port) - dlmgr = None + download_manager = None if start_lt: # If libtorrent tries to connect to the socks5 servers before they are loaded, # it will never recover (on Mac/Linux with Libtorrent >=1.2.0). Therefore, we start # libtorrent afterwards. - dlmgr_settings = LibtorrentSettings() - dlmgr_settings.dht = False - - dlmgr = DownloadManager(state_dir=temp_path_factory.mktemp('state_dir'), - config=dlmgr_settings, - peer_mid=MagicMock(), - socks_listen_ports=socks_ports, - notifier=MagicMock()) - dlmgr.metadata_tmpdir = temp_path_factory.mktemp('metadata_tmpdir') - - comm_config = comm_config or TunnelCommunitySettings() - comm_config.exitnode_enabled = exitnode - mock_ipv8 = MockIPv8("curve25519", - TriblerTunnelCommunity, - settings={"max_circuits": 1}, - config=comm_config, - socks_servers=socks_servers, - dlmgr=dlmgr) + download_manager_settings = LibtorrentSettings() + download_manager_settings.dht = False + + download_manager = DownloadManager(state_dir=temp_path_factory.mktemp('state_dir'), + config=download_manager_settings, + peer_mid=MagicMock(), + socks_listen_ports=socks_ports, + notifier=MagicMock()) + download_manager.metadata_tmpdir = temp_path_factory.mktemp('metadata_tmpdir') + + config = config or TunnelCommunitySettings() + config.exitnode_enabled = exit_node_enable + + ipv8 = MockIPv8("curve25519", + TriblerTunnelCommunity, + settings={"max_circuits": 1}, + config=config, + socks_servers=socks_servers, + dlmgr=download_manager) if start_lt: - dlmgr.peer_mid = mock_ipv8.my_peer.mid - dlmgr.initialize() - dlmgr.is_shutdown_ready = lambda: True - tunnel_community = mock_ipv8.overlay + download_manager.peer_mid = ipv8.my_peer.mid + download_manager.initialize() + download_manager.is_shutdown_ready = lambda: True + tunnel_community = ipv8.overlay - if exitnode: - mock_ipv8.overlay.settings.peer_flags.add(PEER_FLAG_EXIT_BT) - mock_ipv8.overlay.dht_provider = MockDHTProvider( - Peer(mock_ipv8.overlay.my_peer.key, mock_ipv8.overlay.my_estimated_wan)) - mock_ipv8.overlay.settings.remove_tunnel_delay = 0 + if exit_node_enable: + ipv8.overlay.settings.peer_flags.add(PEER_FLAG_EXIT_BT) + ipv8.overlay.dht_provider = MockDHTProvider(Peer(ipv8.overlay.my_peer.key, ipv8.overlay.my_estimated_wan)) + ipv8.overlay.settings.remove_tunnel_delay = 0 return tunnel_community def start_anon_download(tunnel_community: TriblerTunnelCommunity, - seeder_port, - tdef: TorrentDef, - hops=1): + seeder_port: int, + torrent_def: TorrentDef, + hops: int = 1) -> Download: """ Start an anonymous download in the main Tribler session. """ download_manager = tunnel_community.dlmgr - dscfg = DownloadConfig() - dscfg.set_dest_dir(download_manager.state_dir) - dscfg.set_hops(hops) - download = download_manager.start_download(tdef=tdef, config=dscfg) + config = DownloadConfig() + config.set_dest_dir(download_manager.state_dir) + config.set_hops(hops) + download = download_manager.start_download(tdef=torrent_def, config=config) tunnel_community.bittorrent_peers[download] = [("127.0.0.1", seeder_port)] return download async def introduce_peers(communities: List[TriblerTunnelCommunity]): - for i, j in permutations(communities, 2): - i.walk_to(j.endpoint.wan_address) + for community1, community2 in permutations(communities, 2): + community1.walk_to(community2.endpoint.wan_address) await deliver_messages() -async def deliver_messages(timeout=.1): +async def deliver_messages(timeout: float = .1): """ Allow peers to communicate. The strategy is as follows: @@ -186,11 +185,11 @@ async def deliver_messages(timeout=.1): 3. If not, go back to handling calls (step 2) or return, if the timeout has been reached :param timeout: the maximum time to wait for messages to be delivered """ - rtime = 0 + remaining_time = 0 probable_exit = False - while rtime < timeout: + while remaining_time < timeout: await sleep(.01) - rtime += .01 + remaining_time += .01 if len(all_tasks()) < 2: if probable_exit: break @@ -199,87 +198,83 @@ async def deliver_messages(timeout=.1): probable_exit = False -async def create_nodes(proxy_factory, num_relays=1, num_exitnodes=1): +CreateNodesResult = Tuple[List[TriblerTunnelCommunity], List[TriblerTunnelCommunity]] + + +async def create_nodes(proxy_factory: ProxyFactory, num_relays: int = 1, num_exit_nodes: int = 1) -> CreateNodesResult: relays = [] exit_nodes = [] for _ in range(num_relays): relay = await proxy_factory.get() relays.append(relay) - for _ in range(num_exitnodes): + for _ in range(num_exit_nodes): exit_node = await proxy_factory.get(exitnode=True) exit_nodes.append(exit_node) return relays, exit_nodes @pytest.fixture -async def my_comm(tmp_path_factory): - my_comm = await create_tunnel_community(tmp_path_factory, exitnode=False, start_lt=True) +async def tunnel_community(tmp_path_factory: TempPathFactory): + community = await create_tunnel_community(tmp_path_factory, exit_node_enable=False, start_lt=True) - yield my_comm + yield community - await my_comm.dlmgr.shutdown() - await my_comm.unload() + await community.dlmgr.shutdown() + await community.unload() @pytest.mark.tunneltest @pytest.mark.asyncio -@pytest.mark.timeout(40) -async def test_anon_download(proxy_factory, video_seeder: DownloadManager, video_tdef: TorrentDef, logger, - my_comm: TriblerTunnelCommunity): +async def test_anon_download(proxy_factory: ProxyFactory, video_seeder: DownloadManager, video_tdef: TorrentDef, + tunnel_community: TriblerTunnelCommunity, crash_on_error): """ Testing whether an anonymous download over our tunnels works """ - - crash_on_error() - relays, exit_nodes = await create_nodes(proxy_factory) - await introduce_peers([my_comm] + relays + exit_nodes) - dlmgr = my_comm.dlmgr + await introduce_peers([tunnel_community] + relays + exit_nodes) + download_manager = tunnel_community.dlmgr - download = start_anon_download(my_comm, video_seeder.libtorrent_port, video_tdef) + download = start_anon_download(tunnel_community, video_seeder.libtorrent_port, video_tdef) await download.wait_for_status(DLSTATUS_DOWNLOADING) - dlmgr.set_download_states_callback(dlmgr.sesscb_states_callback, interval=.1) + download_manager.set_download_states_callback(download_manager.sesscb_states_callback, interval=.1) - while not my_comm.find_circuits(): - num_verified_peers = len(my_comm.network.verified_peers) + while not tunnel_community.find_circuits(): + num_verified_peers = len(tunnel_community.network.verified_peers) logger.warning("No circuits found - checking again later (verified peers: %d)", num_verified_peers) await sleep(.5) await sleep(.6) - assert my_comm.find_circuits()[0].bytes_up > 0 - assert my_comm.find_circuits()[0].bytes_down > 0 + assert tunnel_community.find_circuits()[0].bytes_up > 0 + assert tunnel_community.find_circuits()[0].bytes_down > 0 @pytest.mark.tunneltest @pytest.mark.asyncio -@pytest.mark.timeout(40) -async def test_hidden_services(proxy_factory, hidden_seeder_comm, video_tdef, logger): +async def test_hidden_services(proxy_factory: ProxyFactory, hidden_seeder_comm: TriblerTunnelCommunity, + video_tdef: TorrentDef, crash_on_error): """ Test the hidden services overlay by constructing an end-to-end circuit and downloading a torrent over it """ + leecher_community = await proxy_factory.get(exitnode=False, start_lt=True) - crash_on_error() - - leecher_comm = await proxy_factory.get(exitnode=False, start_lt=True) - - hidden_seeder_comm.build_tunnels(1) + hidden_seeder_comm.build_tunnels(hops=1) - relays, exit_nodes = await create_nodes(proxy_factory, num_relays=3, num_exitnodes=2) - await introduce_peers([leecher_comm, hidden_seeder_comm] + relays + exit_nodes) + relays, exit_nodes = await create_nodes(proxy_factory, num_relays=3, num_exit_nodes=2) + await introduce_peers([leecher_community, hidden_seeder_comm] + relays + exit_nodes) await deliver_messages(timeout=1) - for comm in [leecher_comm, hidden_seeder_comm] + relays + exit_nodes: - assert len(comm.get_peers()) == 6 + for community in [leecher_community, hidden_seeder_comm] + relays + exit_nodes: + assert len(community.get_peers()) == 6 - progress = Future() + download_finished = asyncio.Event() - def download_state_callback(ds): - leecher_comm.monitor_downloads([ds]) - logger.info("Time: %s, status: %s, progress: %s", time.time(), ds.get_status(), ds.get_progress()) - if ds.get_progress(): - progress.set_result(None) + def download_state_callback(state): + leecher_community.monitor_downloads([state]) + logger.info(f"Time: {time.time()}, status: {state.get_status()}, progress: {state.get_progress()}") + if state.get_progress(): + download_finished.set() return 2 - leecher_comm.build_tunnels(1) + leecher_community.build_tunnels(hops=1) class MockExitDict(dict): def __getitem__(self, key): @@ -289,6 +284,6 @@ def __getitem__(self, key): for e in exit_nodes: e.exit_sockets = MockExitDict(e.exit_sockets) - download = start_anon_download(leecher_comm, hidden_seeder_comm.dlmgr.libtorrent_port, video_tdef, hops=1) + download = start_anon_download(leecher_community, hidden_seeder_comm.dlmgr.libtorrent_port, video_tdef, hops=1) download.set_state_callback(download_state_callback) - await progress + await download_finished.wait()