Skip to content

Commit

Permalink
Refactor tunnels tests
Browse files Browse the repository at this point in the history
  • Loading branch information
drew2a committed Apr 11, 2022
1 parent e2cc9d3 commit 6ee1097
Showing 1 changed file with 119 additions and 124 deletions.
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
import asyncio
import logging
import sys
import time
from asyncio import Future, all_tasks, get_event_loop, sleep
from asyncio import all_tasks, get_event_loop, sleep
from collections import defaultdict
from itertools import permutations
from traceback import print_exception
from typing import List
from typing import List, Optional, Tuple

import pytest
from _pytest.tmpdir import TempPathFactory
from asynctest import MagicMock

from ipv8.messaging.anonymization.tunnel import PEER_FLAG_EXIT_BT
from ipv8.peer import Peer
from ipv8.test.messaging.anonymization import test_community
from ipv8.test.messaging.anonymization.mock import MockDHTProvider
from ipv8.test.mocking.exit_socket import MockTunnelExitSocket
from ipv8.test.mocking.ipv8 import MockIPv8

import pytest

# Pylint does not agree with the way pytest handles fixtures.
# pylint: disable=W0613,W0621
from tribler.core.components.libtorrent.download_manager.download import Download
from tribler.core.components.libtorrent.download_manager.download_config import DownloadConfig
from tribler.core.components.libtorrent.download_manager.download_manager import DownloadManager
from tribler.core.components.libtorrent.settings import LibtorrentSettings
Expand All @@ -31,77 +31,76 @@
from tribler.core.tests.tools.common import TESTS_DATA_DIR
from tribler.core.utilities.simpledefs import DLSTATUS_DOWNLOADING, DLSTATUS_SEEDING, dlstatus_strings

logger = logging.getLogger("TunnelTests")


@pytest.fixture
def crash_on_error():
def exception_handler(loop, context):
exc = context.get('exception')
print_exception(type(exc), exc, exc.__traceback__)
def exception_handler(_, context):
logger.exception(context.get('exception'))
sys.exit(-1)

get_event_loop().set_exception_handler(exception_handler)


class ProxyFactory:

def __init__(self, temp_path_factory):
self.comms = []
self.communities = []
self.temp_path_factory = temp_path_factory

async def get(self, exitnode=False, start_lt=False):
tunn_comm_config = TunnelCommunitySettings()
comm = await create_tunnel_community(self.temp_path_factory, tunn_comm_config, exitnode=exitnode,
start_lt=start_lt)
self.comms.append(comm)
return comm
async def get(self, exitnode=False, start_lt=False) -> TriblerTunnelCommunity:
tunnel_community_config = TunnelCommunitySettings()
community = await create_tunnel_community(self.temp_path_factory, tunnel_community_config,
exit_node_enable=exitnode,
start_lt=start_lt)
self.communities.append(community)
return community


@pytest.fixture
async def logger():
return logging.getLogger("TunnelTests")


@pytest.fixture
async def proxy_factory(tmp_path_factory):
async def proxy_factory(tmp_path_factory: TempPathFactory):
factory = ProxyFactory(tmp_path_factory)
yield factory

for comm in factory.comms:
if comm.dlmgr:
await comm.dlmgr.shutdown()
await comm.unload()
for community in factory.communities:
if community.dlmgr:
await community.dlmgr.shutdown()
await community.unload()
test_community.global_dht_services = defaultdict(list) # Reset the global_dht_services variable


@pytest.fixture
async def hidden_seeder_comm(proxy_factory, video_tdef):
async def hidden_seeder_comm(proxy_factory: ProxyFactory, video_tdef: TorrentDef) -> TriblerTunnelCommunity:
# Also load the tunnel community in the seeder session
comm = await proxy_factory.get(start_lt=True)
comm.build_tunnels(1)
community = await proxy_factory.get(start_lt=True)
community.build_tunnels(1)

dscfg_seed = DownloadConfig()
dscfg_seed.set_dest_dir(TESTS_DATA_DIR)
dscfg_seed.set_hops(1)
upload = comm.dlmgr.start_download(tdef=video_tdef, config=dscfg_seed)
download_config = DownloadConfig()
download_config.set_dest_dir(TESTS_DATA_DIR)
download_config.set_hops(1)
upload = community.dlmgr.start_download(tdef=video_tdef, config=download_config)

def seeder_state_callback(ds):
def seeder_state_callback(download_state):
"""
The callback of the seeder download. For now, this only logs the state of the download that's seeder and is
useful for debugging purposes.
"""
comm.monitor_downloads([ds])
d = ds.get_download()
print(f"seeder: {repr(d.get_def().get_name())} {dlstatus_strings[ds.get_status()]} {ds.get_progress()}")
community.monitor_downloads([download_state])
download = download_state.get_download()
status = dlstatus_strings[download_state.get_status()]
logger.info(f"seeder: {repr(download.get_def().get_name())} {status} {download_state.get_progress()}")
return 2

upload.set_state_callback(seeder_state_callback)

await upload.wait_for_status(DLSTATUS_SEEDING)
return comm
return community


async def create_tunnel_community(temp_path_factory, comm_config: TunnelCommunitySettings = None,
exitnode=False,
start_lt=False) -> TriblerTunnelCommunity:
async def create_tunnel_community(temp_path_factory: TempPathFactory,
config: Optional[TunnelCommunitySettings] = None,
exit_node_enable: bool = False,
start_lt: bool = False) -> TriblerTunnelCommunity:
"""
Load the tunnel community in a given session. We are using our own tunnel community here instead of the one
used in Tribler.
Expand All @@ -116,68 +115,68 @@ async def create_tunnel_community(temp_path_factory, comm_config: TunnelCommunit
await socks_server.start()
socks_ports.append(socks_server.port)

dlmgr = None
download_manager = None
if start_lt:
# If libtorrent tries to connect to the socks5 servers before they are loaded,
# it will never recover (on Mac/Linux with Libtorrent >=1.2.0). Therefore, we start
# libtorrent afterwards.
dlmgr_settings = LibtorrentSettings()
dlmgr_settings.dht = False

dlmgr = DownloadManager(state_dir=temp_path_factory.mktemp('state_dir'),
config=dlmgr_settings,
peer_mid=MagicMock(),
socks_listen_ports=socks_ports,
notifier=MagicMock())
dlmgr.metadata_tmpdir = temp_path_factory.mktemp('metadata_tmpdir')

comm_config = comm_config or TunnelCommunitySettings()
comm_config.exitnode_enabled = exitnode
mock_ipv8 = MockIPv8("curve25519",
TriblerTunnelCommunity,
settings={"max_circuits": 1},
config=comm_config,
socks_servers=socks_servers,
dlmgr=dlmgr)
download_manager_settings = LibtorrentSettings()
download_manager_settings.dht = False

download_manager = DownloadManager(state_dir=temp_path_factory.mktemp('state_dir'),
config=download_manager_settings,
peer_mid=MagicMock(),
socks_listen_ports=socks_ports,
notifier=MagicMock())
download_manager.metadata_tmpdir = temp_path_factory.mktemp('metadata_tmpdir')

config = config or TunnelCommunitySettings()
config.exitnode_enabled = exit_node_enable

ipv8 = MockIPv8("curve25519",
TriblerTunnelCommunity,
settings={"max_circuits": 1},
config=config,
socks_servers=socks_servers,
dlmgr=download_manager)
if start_lt:
dlmgr.peer_mid = mock_ipv8.my_peer.mid
dlmgr.initialize()
dlmgr.is_shutdown_ready = lambda: True
tunnel_community = mock_ipv8.overlay
download_manager.peer_mid = ipv8.my_peer.mid
download_manager.initialize()
download_manager.is_shutdown_ready = lambda: True
tunnel_community = ipv8.overlay

if exitnode:
mock_ipv8.overlay.settings.peer_flags.add(PEER_FLAG_EXIT_BT)
mock_ipv8.overlay.dht_provider = MockDHTProvider(
Peer(mock_ipv8.overlay.my_peer.key, mock_ipv8.overlay.my_estimated_wan))
mock_ipv8.overlay.settings.remove_tunnel_delay = 0
if exit_node_enable:
ipv8.overlay.settings.peer_flags.add(PEER_FLAG_EXIT_BT)
ipv8.overlay.dht_provider = MockDHTProvider(Peer(ipv8.overlay.my_peer.key, ipv8.overlay.my_estimated_wan))
ipv8.overlay.settings.remove_tunnel_delay = 0

return tunnel_community


def start_anon_download(tunnel_community: TriblerTunnelCommunity,
seeder_port,
tdef: TorrentDef,
hops=1):
seeder_port: int,
torrent_def: TorrentDef,
hops: int = 1) -> Download:
"""
Start an anonymous download in the main Tribler session.
"""
download_manager = tunnel_community.dlmgr
dscfg = DownloadConfig()
dscfg.set_dest_dir(download_manager.state_dir)
dscfg.set_hops(hops)
download = download_manager.start_download(tdef=tdef, config=dscfg)
config = DownloadConfig()
config.set_dest_dir(download_manager.state_dir)
config.set_hops(hops)
download = download_manager.start_download(tdef=torrent_def, config=config)
tunnel_community.bittorrent_peers[download] = [("127.0.0.1", seeder_port)]
return download


async def introduce_peers(communities: List[TriblerTunnelCommunity]):
for i, j in permutations(communities, 2):
i.walk_to(j.endpoint.wan_address)
for community1, community2 in permutations(communities, 2):
community1.walk_to(community2.endpoint.wan_address)

await deliver_messages()


async def deliver_messages(timeout=.1):
async def deliver_messages(timeout: float = .1):
"""
Allow peers to communicate.
The strategy is as follows:
Expand All @@ -186,11 +185,11 @@ async def deliver_messages(timeout=.1):
3. If not, go back to handling calls (step 2) or return, if the timeout has been reached
:param timeout: the maximum time to wait for messages to be delivered
"""
rtime = 0
remaining_time = 0
probable_exit = False
while rtime < timeout:
while remaining_time < timeout:
await sleep(.01)
rtime += .01
remaining_time += .01
if len(all_tasks()) < 2:
if probable_exit:
break
Expand All @@ -199,87 +198,83 @@ async def deliver_messages(timeout=.1):
probable_exit = False


async def create_nodes(proxy_factory, num_relays=1, num_exitnodes=1):
CreateNodesResult = Tuple[List[TriblerTunnelCommunity], List[TriblerTunnelCommunity]]


async def create_nodes(proxy_factory: ProxyFactory, num_relays: int = 1, num_exit_nodes: int = 1) -> CreateNodesResult:
relays = []
exit_nodes = []
for _ in range(num_relays):
relay = await proxy_factory.get()
relays.append(relay)
for _ in range(num_exitnodes):
for _ in range(num_exit_nodes):
exit_node = await proxy_factory.get(exitnode=True)
exit_nodes.append(exit_node)
return relays, exit_nodes


@pytest.fixture
async def my_comm(tmp_path_factory):
my_comm = await create_tunnel_community(tmp_path_factory, exitnode=False, start_lt=True)
async def tunnel_community(tmp_path_factory: TempPathFactory):
community = await create_tunnel_community(tmp_path_factory, exit_node_enable=False, start_lt=True)

yield my_comm
yield community

await my_comm.dlmgr.shutdown()
await my_comm.unload()
await community.dlmgr.shutdown()
await community.unload()


@pytest.mark.tunneltest
@pytest.mark.asyncio
@pytest.mark.timeout(40)
async def test_anon_download(proxy_factory, video_seeder: DownloadManager, video_tdef: TorrentDef, logger,
my_comm: TriblerTunnelCommunity):
async def test_anon_download(proxy_factory: ProxyFactory, video_seeder: DownloadManager, video_tdef: TorrentDef,
tunnel_community: TriblerTunnelCommunity, crash_on_error):
"""
Testing whether an anonymous download over our tunnels works
"""

crash_on_error()

relays, exit_nodes = await create_nodes(proxy_factory)
await introduce_peers([my_comm] + relays + exit_nodes)
dlmgr = my_comm.dlmgr
await introduce_peers([tunnel_community] + relays + exit_nodes)
download_manager = tunnel_community.dlmgr

download = start_anon_download(my_comm, video_seeder.libtorrent_port, video_tdef)
download = start_anon_download(tunnel_community, video_seeder.libtorrent_port, video_tdef)
await download.wait_for_status(DLSTATUS_DOWNLOADING)
dlmgr.set_download_states_callback(dlmgr.sesscb_states_callback, interval=.1)
download_manager.set_download_states_callback(download_manager.sesscb_states_callback, interval=.1)

while not my_comm.find_circuits():
num_verified_peers = len(my_comm.network.verified_peers)
while not tunnel_community.find_circuits():
num_verified_peers = len(tunnel_community.network.verified_peers)
logger.warning("No circuits found - checking again later (verified peers: %d)", num_verified_peers)
await sleep(.5)
await sleep(.6)
assert my_comm.find_circuits()[0].bytes_up > 0
assert my_comm.find_circuits()[0].bytes_down > 0
assert tunnel_community.find_circuits()[0].bytes_up > 0
assert tunnel_community.find_circuits()[0].bytes_down > 0


@pytest.mark.tunneltest
@pytest.mark.asyncio
@pytest.mark.timeout(40)
async def test_hidden_services(proxy_factory, hidden_seeder_comm, video_tdef, logger):
async def test_hidden_services(proxy_factory: ProxyFactory, hidden_seeder_comm: TriblerTunnelCommunity,
video_tdef: TorrentDef, crash_on_error):
"""
Test the hidden services overlay by constructing an end-to-end circuit and downloading a torrent over it
"""
leecher_community = await proxy_factory.get(exitnode=False, start_lt=True)

crash_on_error()

leecher_comm = await proxy_factory.get(exitnode=False, start_lt=True)

hidden_seeder_comm.build_tunnels(1)
hidden_seeder_comm.build_tunnels(hops=1)

relays, exit_nodes = await create_nodes(proxy_factory, num_relays=3, num_exitnodes=2)
await introduce_peers([leecher_comm, hidden_seeder_comm] + relays + exit_nodes)
relays, exit_nodes = await create_nodes(proxy_factory, num_relays=3, num_exit_nodes=2)
await introduce_peers([leecher_community, hidden_seeder_comm] + relays + exit_nodes)
await deliver_messages(timeout=1)

for comm in [leecher_comm, hidden_seeder_comm] + relays + exit_nodes:
assert len(comm.get_peers()) == 6
for community in [leecher_community, hidden_seeder_comm] + relays + exit_nodes:
assert len(community.get_peers()) == 6

progress = Future()
download_finished = asyncio.Event()

def download_state_callback(ds):
leecher_comm.monitor_downloads([ds])
logger.info("Time: %s, status: %s, progress: %s", time.time(), ds.get_status(), ds.get_progress())
if ds.get_progress():
progress.set_result(None)
def download_state_callback(state):
leecher_community.monitor_downloads([state])
logger.info(f"Time: {time.time()}, status: {state.get_status()}, progress: {state.get_progress()}")
if state.get_progress():
download_finished.set()
return 2

leecher_comm.build_tunnels(1)
leecher_community.build_tunnels(hops=1)

class MockExitDict(dict):
def __getitem__(self, key):
Expand All @@ -289,6 +284,6 @@ def __getitem__(self, key):
for e in exit_nodes:
e.exit_sockets = MockExitDict(e.exit_sockets)

download = start_anon_download(leecher_comm, hidden_seeder_comm.dlmgr.libtorrent_port, video_tdef, hops=1)
download = start_anon_download(leecher_community, hidden_seeder_comm.dlmgr.libtorrent_port, video_tdef, hops=1)
download.set_state_callback(download_state_callback)
await progress
await download_finished.wait()

0 comments on commit 6ee1097

Please sign in to comment.