Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated the IPv8 pointer #7620

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion doc/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ sphinxcontrib-openapi==0.7.0
configobj==5.0.6
mistune==0.8.4 # sphinxcontrib-openapi==0.7.0 cannot work with the latest mistune version (2.0.0)
MarkupSafe==2.0.1 # used by jinja2; 2.1.0 version removes soft_unicode and breaks jinja2-2.11.3
pyipv8==2.8.0
pyipv8==2.12.0
setuptools>=65.5.1 # not directly required, pinned by Snyk to avoid a vulnerability
3 changes: 2 additions & 1 deletion requirements-core.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ sentry-sdk==1.31.0
yappi==1.4.0
yarl==1.9.2 # keep this dependency higher than 1.6.3. See: https://github.com/aio-libs/yarl/issues/517
bitarray==2.7.6
pyipv8==2.11.0
pyipv8==2.12.0
libtorrent==1.2.19
file-read-backwards==3.0.0
Brotli==1.0.9 # to prevent AttributeError on macOs: module 'brotli' has no attribute 'error' (in urllib3.response)
human-readable==1.3.2
typing_extensions==4.7.1
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from tribler.core.components.bandwidth_accounting.community.bandwidth_accounting_community import (
BandwidthAccountingCommunity,
BandwidthAccountingTestnetCommunity,
BandwidthCommunitySettings,
)
from tribler.core.components.bandwidth_accounting.db.database import BandwidthDatabase
from tribler.core.components.component import Component
Expand Down Expand Up @@ -40,12 +41,14 @@ async def run(self):
store_all_transactions=store_all_transactions)

kwargs = {"max_peers": -1} if unlimited_peers else {}
self.community = bandwidth_cls(self._ipv8_component.peer,
self._ipv8_component.ipv8.endpoint,
self._ipv8_component.ipv8.network,
settings=config.bandwidth_accounting,
database=self.database,
**kwargs)
self.community = bandwidth_cls(BandwidthCommunitySettings(
my_peer=self._ipv8_component.peer,
endpoint=self._ipv8_component.ipv8.endpoint,
network=self._ipv8_component.ipv8.network,
settings=config.bandwidth_accounting,
database=self.database,
**kwargs
))

self._ipv8_component.initialise_community_by_default(self.community)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,33 @@
)
from tribler.core.components.bandwidth_accounting.db.database import BandwidthDatabase
from tribler.core.components.bandwidth_accounting.db.transaction import BandwidthTransactionData, EMPTY_SIGNATURE
from tribler.core.components.ipv8.tribler_community import TriblerCommunity
from tribler.core.components.ipv8.tribler_community import TriblerCommunity, TriblerSettings
from tribler.core.utilities.unicode import hexlify


class BandwidthCommunitySettings(TriblerSettings):
database: BandwidthDatabase | None = None


class BandwidthAccountingCommunity(TriblerCommunity):
"""
Community around bandwidth accounting and payouts.
"""
community_id = unhexlify('79b25f2867739261780faefede8f25038de9975d')
DB_NAME = 'bandwidth'
version = b'\x02'
settings_class = BandwidthCommunitySettings

def __init__(self, *args, **kwargs) -> None:
def __init__(self, settings: BandwidthCommunitySettings) -> None:
"""
Initialize the community.
:param persistence: The database that stores transactions, will be created if not provided.
:param database_path: The path at which the database will be created. Defaults to the current working directory.
"""
self.database: BandwidthDatabase = kwargs.pop('database', None)
self.database = settings.database
self.random = Random()

super().__init__(*args, **kwargs)
super().__init__(settings)

self.request_cache = RequestCache()
self.my_pk = self.my_peer.public_key.key_to_bin()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from ipv8.test.mocking.ipv8 import MockIPv8

from tribler.core.components.bandwidth_accounting.community.bandwidth_accounting_community import (
BandwidthAccountingCommunity,
BandwidthAccountingCommunity, BandwidthCommunitySettings,
)
from tribler.core.components.bandwidth_accounting.db.database import BandwidthDatabase
from tribler.core.components.bandwidth_accounting.db.transaction import BandwidthTransactionData, EMPTY_SIGNATURE
Expand All @@ -27,9 +27,10 @@ def bandwidth_database(tmp_path, peer):

@pytest.fixture
async def bw_community(bandwidth_database, peer):
ipv8 = MockIPv8(peer, BandwidthAccountingCommunity,
database=bandwidth_database,
settings=BandwidthAccountingSettings())
ipv8 = MockIPv8(peer, BandwidthAccountingCommunity, BandwidthCommunitySettings(
database=bandwidth_database,
settings=BandwidthAccountingSettings()
))
community = ipv8.get_overlay(BandwidthAccountingCommunity)
yield community
await ipv8.stop()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from __future__ import annotations

from ipv8.keyvault.crypto import default_eccrypto
from ipv8.peer import Peer
from ipv8.test.base import TestBase
from ipv8.test.mocking.ipv8 import MockIPv8

from tribler.core.components.bandwidth_accounting.community.bandwidth_accounting_community import (
BandwidthAccountingCommunity,
BandwidthCommunitySettings,
)
from tribler.core.components.bandwidth_accounting.community.cache import BandwidthTransactionSignCache
from tribler.core.components.bandwidth_accounting.db.database import BandwidthDatabase
Expand All @@ -21,11 +24,17 @@ def setUp(self):
super().setUp()
self.initialize(BandwidthAccountingCommunity, 2)

def create_node(self):
def create_node(self, settings: BandwidthCommunitySettings | None = None, # pylint: disable=unused-argument
create_dht: bool = False, enable_statistics: bool = False): # pylint: disable=unused-argument
peer = Peer(default_eccrypto.generate_key("curve25519"), address=("1.2.3.4", 5))
db = BandwidthDatabase(db_path=MEMORY_DB, my_pub_key=peer.public_key.key_to_bin())
ipv8 = MockIPv8(peer, BandwidthAccountingCommunity, database=db,
settings=BandwidthAccountingSettings())
db = BandwidthDatabase(db_path=MEMORY_DB,
my_pub_key=peer.public_key.key_to_bin())
ipv8 = MockIPv8(peer,
BandwidthAccountingCommunity,
BandwidthCommunitySettings(
database=db,
settings=BandwidthAccountingSettings()
))
return ipv8

def database(self, i):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import time
import uuid
from binascii import unhexlify
Expand All @@ -13,7 +15,10 @@
from tribler.core.components.ipv8.discovery_booster import DiscoveryBooster
from tribler.core.components.metadata_store.db.serialization import CHANNEL_TORRENT
from tribler.core.components.metadata_store.remote_query_community.payload_checker import ObjState
from tribler.core.components.metadata_store.remote_query_community.remote_query_community import RemoteQueryCommunity
from tribler.core.components.metadata_store.remote_query_community.remote_query_community import (
RemoteCommunitySettings,
RemoteQueryCommunity
)
from tribler.core.components.metadata_store.utils import NoChannelSourcesException
from tribler.core.utilities.notifier import Notifier
from tribler.core.utilities.simpledefs import CHANNELS_VIEW_UUID
Expand Down Expand Up @@ -70,8 +75,13 @@ def get_last_seen_peers_for_channel(self, channel_pk: bytes, channel_id: int, li
return sorted(channel_peers, key=lambda x: x.last_response, reverse=True)[0:limit]


class GigaCommunitySettings(RemoteCommunitySettings):
notifier: Notifier | None = None


class GigaChannelCommunity(RemoteQueryCommunity):
community_id = unhexlify('d3512d0ff816d8ac672eab29a9c1a3a32e17cb13')
settings_class = GigaCommunitySettings

def create_introduction_response(
self,
Expand All @@ -94,14 +104,12 @@ def create_introduction_response(
new_style=new_style,
)

def __init__(
self, *args, notifier: Notifier = None, **kwargs
): # pylint: disable=unused-argument
def __init__(self, settings: GigaCommunitySettings): # pylint: disable=unused-argument
# ACHTUNG! We create a separate instance of Network for this community because it
# walks aggressively and wants lots of peers, which can interfere with other communities
super().__init__(*args, **kwargs)
super().__init__(settings)

self.notifier = notifier
self.notifier = settings.notifier

# This set contains all the peers that we queried for subscribed channels over time.
# It is emptied regularly. The purpose of this set is to work as a filter so we never query the same
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
from __future__ import annotations

import time
from collections.abc import Mapping
from dataclasses import asdict, dataclass, fields
from typing import Callable
from unittest.mock import AsyncMock, Mock

import pytest

from ipv8.community import CommunitySettings
from ipv8.keyvault.crypto import default_eccrypto
from ipv8.peer import Peer
from ipv8.test.base import TestBase
Expand All @@ -13,7 +17,7 @@
from tribler.core.components.gigachannel.community.gigachannel_community import (
ChannelsPeersMapping,
GigaChannelCommunity,
NoChannelSourcesException,
GigaCommunitySettings, NoChannelSourcesException,
happy_eyeballs_delay
)
from tribler.core.components.gigachannel.community.settings import ChantSettings
Expand Down Expand Up @@ -66,18 +70,20 @@ async def tearDown(self):
metadata_store.shutdown()
await super().tearDown()

def create_node(self, *args, **kwargs):
def create_node(self, settings: CommunitySettings | None = None, # pylint: disable=unused-argument
create_dht: bool = False, enable_statistics: bool = False): # pylint: disable=unused-argument
metadata_store = MetadataStore(
Path(self.temporary_directory()) / f"{self.count}.db",
Path(self.temporary_directory()),
default_eccrypto.generate_key("curve25519"),
disable_sync=True,
)
self.metadata_store_set.add(metadata_store)
kwargs['metadata_store'] = metadata_store
kwargs['settings'] = ChantSettings()
kwargs['rqc_settings'] = RemoteQueryCommunitySettings()
node = super().create_node(*args, **kwargs)
node = super().create_node(GigaCommunitySettings(
metadata_store=metadata_store,
settings=ChantSettings(),
rqc_settings=RemoteQueryCommunitySettings()
))

node.overlay.discovery_booster.finish()
notifier = Notifier(loop=self.loop)
Expand Down
12 changes: 6 additions & 6 deletions src/tribler/core/components/gigachannel/gigachannel_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from tribler.core.components.database.database_component import DatabaseComponent
from tribler.core.components.gigachannel.community.gigachannel_community import (
GigaChannelCommunity,
GigaChannelTestnetCommunity,
GigaChannelTestnetCommunity, GigaCommunitySettings,
)
from tribler.core.components.gigachannel.community.sync_strategy import RemovePeers
from tribler.core.components.ipv8.ipv8_component import INFINITE, Ipv8Component
Expand All @@ -29,17 +29,17 @@ async def run(self):
db_component = await self.get_component(DatabaseComponent)

giga_channel_cls = GigaChannelTestnetCommunity if config.general.testnet else GigaChannelCommunity
community = giga_channel_cls(
self._ipv8_component.peer,
self._ipv8_component.ipv8.endpoint,
Network(),
community = giga_channel_cls(GigaCommunitySettings(
my_peer=self._ipv8_component.peer,
endpoint=self._ipv8_component.ipv8.endpoint,
network=Network(),
notifier=notifier,
settings=config.chant,
rqc_settings=config.remote_query_community,
metadata_store=metadata_store_component.mds,
max_peers=50,
tribler_db=db_component.db if db_component else None
)
))
self.community = community
self._ipv8_component.initialise_community_by_default(community, default_random_walk_max_peers=30)
self._ipv8_component.ipv8.add_strategy(community, RemovePeers(community), INFINITE)
Expand Down
17 changes: 14 additions & 3 deletions src/tribler/core/components/ipv8/ipv8_component.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Optional

from ipv8.bootstrapping.dispersy.bootstrapper import DispersyBootstrapper
from ipv8.community import CommunitySettings
from ipv8.configuration import ConfigBuilder, DISPERSY_BOOTSTRAPPER
from ipv8.dht.churn import PingChurn
from ipv8.dht.discovery import DHTDiscoveryCommunity
Expand Down Expand Up @@ -112,15 +113,25 @@ def make_bootstrapper(self) -> DispersyBootstrapper:

def _init_peer_discovery_community(self):
ipv8 = self.ipv8
community = DiscoveryCommunity(self.peer, ipv8.endpoint, ipv8.network, max_peers=100)
community = DiscoveryCommunity(CommunitySettings(
my_peer=self.peer,
endpoint=ipv8.endpoint,
network=ipv8.network,
max_peers=100
))
self.initialise_community_by_default(community)
ipv8.add_strategy(community, RandomChurn(community), INFINITE)
ipv8.add_strategy(community, PeriodicSimilarity(community), INFINITE)
self._peer_discovery_community = community

def _init_dht_discovery_community(self):
ipv8 = self.ipv8
community = DHTDiscoveryCommunity(self.peer, ipv8.endpoint, ipv8.network, max_peers=60)
community = DHTDiscoveryCommunity(CommunitySettings(
my_peer=self.peer,
endpoint=ipv8.endpoint,
network=ipv8.network,
max_peers=60
))
self.initialise_community_by_default(community)
ipv8.add_strategy(community, PingChurn(community), INFINITE)
self.dht_discovery_community = community
Expand All @@ -136,4 +147,4 @@ async def shutdown(self):
await self.ipv8.unload_overlay(overlay)

await self._task_manager.shutdown_task_manager()
await self.ipv8.stop(stop_loop=False)
await self.ipv8.stop()
16 changes: 12 additions & 4 deletions src/tribler/core/components/ipv8/tribler_community.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
from ipv8.community import Community
from __future__ import annotations

from ipv8.community import Community, CommunitySettings

from tribler.core.config.tribler_config_section import TriblerConfigSection


class TriblerSettings(CommunitySettings):
settings: TriblerConfigSection | None = None


class TriblerCommunity(Community):
"""Base class for Tribler communities.
"""

def __init__(self, *args, settings: TriblerConfigSection = None, **kwargs):
super().__init__(*args, **kwargs)
self.settings = settings
settings_class = TriblerSettings

def __init__(self, settings: TriblerSettings):
super().__init__(settings)
self.settings = settings.settings
self.logger.info(f'Init. Settings: {settings}.')
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from ipv8.types import Key
from pony.orm import db_session

from tribler.core.components.ipv8.tribler_community import TriblerCommunity
from tribler.core.components.ipv8.tribler_community import TriblerCommunity, TriblerSettings
from tribler.core.components.knowledge.community.knowledge_payload import (
RawStatementOperationMessage,
RequestStatementOperationMessage,
Expand All @@ -26,25 +26,31 @@
CLEAR_ALL_REQUESTS_INTERVAL = 10 * 60 # 10 minutes


class KnowledgeSettings(TriblerSettings):
db: TriblerDatabase
key: LibNaCLSK
request_interval = REQUEST_INTERVAL


class KnowledgeCommunity(TriblerCommunity):
""" Community for disseminating tags across the network.
Only tags that are older than 1 minute will be gossiped.
"""

community_id = unhexlify('d7f7bdc8bcd3d9ad23f06f25aa8aab6754eb23a0')
settings_class = KnowledgeSettings

def __init__(self, *args, db: TriblerDatabase, key: LibNaCLSK, request_interval=REQUEST_INTERVAL,
**kwargs):
super().__init__(*args, **kwargs)
self.db = db
self.key = key
def __init__(self, settings: KnowledgeSettings):
super().__init__(settings)
self.db = settings.db
self.key = settings.key
self.requests = OperationsRequests()

self.add_message_handler(RawStatementOperationMessage, self.on_message)
self.add_message_handler(RequestStatementOperationMessage, self.on_request)

self.register_task("request_operations", self.request_operations, interval=request_interval)
self.register_task("request_operations", self.request_operations, interval=settings.request_interval)
self.register_task("clear_requests", self.requests.clear_requests, interval=CLEAR_ALL_REQUESTS_INTERVAL)
self.logger.info('Knowledge community initialized')

Expand Down
Loading
Loading