Skip to content

Commit

Permalink
Ported communities to CommunitySettings
Browse files Browse the repository at this point in the history
  • Loading branch information
qstokkink committed Oct 5, 2023
1 parent 9cbf824 commit 35d81ca
Show file tree
Hide file tree
Showing 24 changed files with 314 additions and 189 deletions.
1 change: 1 addition & 0 deletions requirements-core.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ libtorrent==1.2.19
file-read-backwards==3.0.0
Brotli==1.0.9 # to prevent AttributeError on macOs: module 'brotli' has no attribute 'error' (in urllib3.response)
human-readable==1.3.2
typing_extensions==4.7.1
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from tribler.core.components.bandwidth_accounting.community.bandwidth_accounting_community import (
BandwidthAccountingCommunity,
BandwidthAccountingTestnetCommunity,
BandwidthCommunitySettings,
)
from tribler.core.components.bandwidth_accounting.db.database import BandwidthDatabase
from tribler.core.components.component import Component
Expand Down Expand Up @@ -40,12 +41,14 @@ async def run(self):
store_all_transactions=store_all_transactions)

kwargs = {"max_peers": -1} if unlimited_peers else {}
self.community = bandwidth_cls(self._ipv8_component.peer,
self._ipv8_component.ipv8.endpoint,
self._ipv8_component.ipv8.network,
settings=config.bandwidth_accounting,
database=self.database,
**kwargs)
self.community = bandwidth_cls(BandwidthCommunitySettings(
my_peer=self._ipv8_component.peer,
endpoint=self._ipv8_component.ipv8.endpoint,
network=self._ipv8_component.ipv8.network,
settings=config.bandwidth_accounting,
database=self.database,
**kwargs
))

self._ipv8_component.initialise_community_by_default(self.community)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,33 @@
)
from tribler.core.components.bandwidth_accounting.db.database import BandwidthDatabase
from tribler.core.components.bandwidth_accounting.db.transaction import BandwidthTransactionData, EMPTY_SIGNATURE
from tribler.core.components.ipv8.tribler_community import TriblerCommunity
from tribler.core.components.ipv8.tribler_community import TriblerCommunity, TriblerSettings
from tribler.core.utilities.unicode import hexlify


class BandwidthCommunitySettings(TriblerSettings):
database: BandwidthDatabase | None = None


class BandwidthAccountingCommunity(TriblerCommunity):
"""
Community around bandwidth accounting and payouts.
"""
community_id = unhexlify('79b25f2867739261780faefede8f25038de9975d')
DB_NAME = 'bandwidth'
version = b'\x02'
settings_class = BandwidthCommunitySettings

def __init__(self, *args, **kwargs) -> None:
def __init__(self, settings: BandwidthCommunitySettings) -> None:
"""
Initialize the community.
:param persistence: The database that stores transactions, will be created if not provided.
:param database_path: The path at which the database will be created. Defaults to the current working directory.
"""
self.database: BandwidthDatabase = kwargs.pop('database', None)
self.database = settings.database
self.random = Random()

super().__init__(*args, **kwargs)
super().__init__(settings)

self.request_cache = RequestCache()
self.my_pk = self.my_peer.public_key.key_to_bin()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from ipv8.test.mocking.ipv8 import MockIPv8

from tribler.core.components.bandwidth_accounting.community.bandwidth_accounting_community import (
BandwidthAccountingCommunity,
BandwidthAccountingCommunity, BandwidthCommunitySettings,
)
from tribler.core.components.bandwidth_accounting.db.database import BandwidthDatabase
from tribler.core.components.bandwidth_accounting.db.transaction import BandwidthTransactionData, EMPTY_SIGNATURE
Expand All @@ -27,9 +27,10 @@ def bandwidth_database(tmp_path, peer):

@pytest.fixture
async def bw_community(bandwidth_database, peer):
ipv8 = MockIPv8(peer, BandwidthAccountingCommunity,
database=bandwidth_database,
settings=BandwidthAccountingSettings())
ipv8 = MockIPv8(peer, BandwidthAccountingCommunity, BandwidthCommunitySettings(
database=bandwidth_database,
settings=BandwidthAccountingSettings()
))
community = ipv8.get_overlay(BandwidthAccountingCommunity)
yield community
await ipv8.stop()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from __future__ import annotations

from ipv8.keyvault.crypto import default_eccrypto
from ipv8.peer import Peer
from ipv8.test.base import TestBase
from ipv8.test.mocking.ipv8 import MockIPv8

from tribler.core.components.bandwidth_accounting.community.bandwidth_accounting_community import (
BandwidthAccountingCommunity,
BandwidthCommunitySettings,
)
from tribler.core.components.bandwidth_accounting.community.cache import BandwidthTransactionSignCache
from tribler.core.components.bandwidth_accounting.db.database import BandwidthDatabase
Expand All @@ -21,11 +24,17 @@ def setUp(self):
super().setUp()
self.initialize(BandwidthAccountingCommunity, 2)

def create_node(self):
def create_node(self, settings: BandwidthCommunitySettings | None = None, # pylint: disable=unused-argument
create_dht: bool = False, enable_statistics: bool = False): # pylint: disable=unused-argument
peer = Peer(default_eccrypto.generate_key("curve25519"), address=("1.2.3.4", 5))
db = BandwidthDatabase(db_path=MEMORY_DB, my_pub_key=peer.public_key.key_to_bin())
ipv8 = MockIPv8(peer, BandwidthAccountingCommunity, database=db,
settings=BandwidthAccountingSettings())
db = BandwidthDatabase(db_path=MEMORY_DB,
my_pub_key=peer.public_key.key_to_bin())
ipv8 = MockIPv8(peer,
BandwidthAccountingCommunity,
BandwidthCommunitySettings(
database=db,
settings=BandwidthAccountingSettings()
))
return ipv8

def database(self, i):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import time
import uuid
from binascii import unhexlify
Expand All @@ -13,7 +15,10 @@
from tribler.core.components.ipv8.discovery_booster import DiscoveryBooster
from tribler.core.components.metadata_store.db.serialization import CHANNEL_TORRENT
from tribler.core.components.metadata_store.remote_query_community.payload_checker import ObjState
from tribler.core.components.metadata_store.remote_query_community.remote_query_community import RemoteQueryCommunity
from tribler.core.components.metadata_store.remote_query_community.remote_query_community import (
RemoteCommunitySettings,
RemoteQueryCommunity
)
from tribler.core.components.metadata_store.utils import NoChannelSourcesException
from tribler.core.utilities.notifier import Notifier
from tribler.core.utilities.simpledefs import CHANNELS_VIEW_UUID
Expand Down Expand Up @@ -70,8 +75,13 @@ def get_last_seen_peers_for_channel(self, channel_pk: bytes, channel_id: int, li
return sorted(channel_peers, key=lambda x: x.last_response, reverse=True)[0:limit]


class GigaCommunitySettings(RemoteCommunitySettings):
notifier: Notifier | None = None


class GigaChannelCommunity(RemoteQueryCommunity):
community_id = unhexlify('d3512d0ff816d8ac672eab29a9c1a3a32e17cb13')
settings_class = GigaCommunitySettings

def create_introduction_response(
self,
Expand All @@ -94,14 +104,12 @@ def create_introduction_response(
new_style=new_style,
)

def __init__(
self, *args, notifier: Notifier = None, **kwargs
): # pylint: disable=unused-argument
def __init__(self, settings: GigaCommunitySettings): # pylint: disable=unused-argument
# ACHTUNG! We create a separate instance of Network for this community because it
# walks aggressively and wants lots of peers, which can interfere with other communities
super().__init__(*args, **kwargs)
super().__init__(settings)

self.notifier = notifier
self.notifier = settings.notifier

# This set contains all the peers that we queried for subscribed channels over time.
# It is emptied regularly. The purpose of this set is to work as a filter so we never query the same
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
from __future__ import annotations

import time
from collections.abc import Mapping
from dataclasses import asdict, dataclass, fields
from typing import Callable
from unittest.mock import AsyncMock, Mock

import pytest

from ipv8.community import CommunitySettings
from ipv8.keyvault.crypto import default_eccrypto
from ipv8.peer import Peer
from ipv8.test.base import TestBase
Expand All @@ -13,7 +17,7 @@
from tribler.core.components.gigachannel.community.gigachannel_community import (
ChannelsPeersMapping,
GigaChannelCommunity,
NoChannelSourcesException,
GigaCommunitySettings, NoChannelSourcesException,
happy_eyeballs_delay
)
from tribler.core.components.gigachannel.community.settings import ChantSettings
Expand Down Expand Up @@ -66,18 +70,20 @@ async def tearDown(self):
metadata_store.shutdown()
await super().tearDown()

def create_node(self, *args, **kwargs):
def create_node(self, settings: CommunitySettings | None = None, # pylint: disable=unused-argument
create_dht: bool = False, enable_statistics: bool = False): # pylint: disable=unused-argument
metadata_store = MetadataStore(
Path(self.temporary_directory()) / f"{self.count}.db",
Path(self.temporary_directory()),
default_eccrypto.generate_key("curve25519"),
disable_sync=True,
)
self.metadata_store_set.add(metadata_store)
kwargs['metadata_store'] = metadata_store
kwargs['settings'] = ChantSettings()
kwargs['rqc_settings'] = RemoteQueryCommunitySettings()
node = super().create_node(*args, **kwargs)
node = super().create_node(GigaCommunitySettings(
metadata_store=metadata_store,
settings=ChantSettings(),
rqc_settings=RemoteQueryCommunitySettings()
))

node.overlay.discovery_booster.finish()
notifier = Notifier(loop=self.loop)
Expand Down
12 changes: 6 additions & 6 deletions src/tribler/core/components/gigachannel/gigachannel_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from tribler.core.components.database.database_component import DatabaseComponent
from tribler.core.components.gigachannel.community.gigachannel_community import (
GigaChannelCommunity,
GigaChannelTestnetCommunity,
GigaChannelTestnetCommunity, GigaCommunitySettings,
)
from tribler.core.components.gigachannel.community.sync_strategy import RemovePeers
from tribler.core.components.ipv8.ipv8_component import INFINITE, Ipv8Component
Expand All @@ -29,17 +29,17 @@ async def run(self):
db_component = await self.get_component(DatabaseComponent)

giga_channel_cls = GigaChannelTestnetCommunity if config.general.testnet else GigaChannelCommunity
community = giga_channel_cls(
self._ipv8_component.peer,
self._ipv8_component.ipv8.endpoint,
Network(),
community = giga_channel_cls(GigaCommunitySettings(
my_peer=self._ipv8_component.peer,
endpoint=self._ipv8_component.ipv8.endpoint,
network=Network(),
notifier=notifier,
settings=config.chant,
rqc_settings=config.remote_query_community,
metadata_store=metadata_store_component.mds,
max_peers=50,
tribler_db=db_component.db if db_component else None
)
))
self.community = community
self._ipv8_component.initialise_community_by_default(community, default_random_walk_max_peers=30)
self._ipv8_component.ipv8.add_strategy(community, RemovePeers(community), INFINITE)
Expand Down
17 changes: 14 additions & 3 deletions src/tribler/core/components/ipv8/ipv8_component.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Optional

from ipv8.bootstrapping.dispersy.bootstrapper import DispersyBootstrapper
from ipv8.community import CommunitySettings
from ipv8.configuration import ConfigBuilder, DISPERSY_BOOTSTRAPPER
from ipv8.dht.churn import PingChurn
from ipv8.dht.discovery import DHTDiscoveryCommunity
Expand Down Expand Up @@ -112,15 +113,25 @@ def make_bootstrapper(self) -> DispersyBootstrapper:

def _init_peer_discovery_community(self):
ipv8 = self.ipv8
community = DiscoveryCommunity(self.peer, ipv8.endpoint, ipv8.network, max_peers=100)
community = DiscoveryCommunity(CommunitySettings(
my_peer=self.peer,
endpoint=ipv8.endpoint,
network=ipv8.network,
max_peers=100
))
self.initialise_community_by_default(community)
ipv8.add_strategy(community, RandomChurn(community), INFINITE)
ipv8.add_strategy(community, PeriodicSimilarity(community), INFINITE)
self._peer_discovery_community = community

def _init_dht_discovery_community(self):
ipv8 = self.ipv8
community = DHTDiscoveryCommunity(self.peer, ipv8.endpoint, ipv8.network, max_peers=60)
community = DHTDiscoveryCommunity(CommunitySettings(
my_peer=self.peer,
endpoint=ipv8.endpoint,
network=ipv8.network,
max_peers=60
))
self.initialise_community_by_default(community)
ipv8.add_strategy(community, PingChurn(community), INFINITE)
self.dht_discovery_community = community
Expand All @@ -136,4 +147,4 @@ async def shutdown(self):
await self.ipv8.unload_overlay(overlay)

await self._task_manager.shutdown_task_manager()
await self.ipv8.stop(stop_loop=False)
await self.ipv8.stop()
16 changes: 12 additions & 4 deletions src/tribler/core/components/ipv8/tribler_community.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
from ipv8.community import Community
from __future__ import annotations

from ipv8.community import Community, CommunitySettings

from tribler.core.config.tribler_config_section import TriblerConfigSection


class TriblerSettings(CommunitySettings):
settings: TriblerConfigSection | None = None


class TriblerCommunity(Community):
"""Base class for Tribler communities.
"""

def __init__(self, *args, settings: TriblerConfigSection = None, **kwargs):
super().__init__(*args, **kwargs)
self.settings = settings
settings_class = TriblerSettings

def __init__(self, settings: TriblerSettings):
super().__init__(settings)
self.settings = settings.settings
self.logger.info(f'Init. Settings: {settings}.')
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from ipv8.types import Key
from pony.orm import db_session

from tribler.core.components.ipv8.tribler_community import TriblerCommunity
from tribler.core.components.ipv8.tribler_community import TriblerCommunity, TriblerSettings
from tribler.core.components.knowledge.community.knowledge_payload import (
RawStatementOperationMessage,
RequestStatementOperationMessage,
Expand All @@ -26,25 +26,31 @@
CLEAR_ALL_REQUESTS_INTERVAL = 10 * 60 # 10 minutes


class KnowledgeSettings(TriblerSettings):
db: TriblerDatabase
key: LibNaCLSK
request_interval = REQUEST_INTERVAL


class KnowledgeCommunity(TriblerCommunity):
""" Community for disseminating tags across the network.
Only tags that are older than 1 minute will be gossiped.
"""

community_id = unhexlify('d7f7bdc8bcd3d9ad23f06f25aa8aab6754eb23a0')
settings_class = KnowledgeSettings

def __init__(self, *args, db: TriblerDatabase, key: LibNaCLSK, request_interval=REQUEST_INTERVAL,
**kwargs):
super().__init__(*args, **kwargs)
self.db = db
self.key = key
def __init__(self, settings: KnowledgeSettings):
super().__init__(settings)
self.db = settings.db
self.key = settings.key
self.requests = OperationsRequests()

self.add_message_handler(RawStatementOperationMessage, self.on_message)
self.add_message_handler(RequestStatementOperationMessage, self.on_request)

self.register_task("request_operations", self.request_operations, interval=request_interval)
self.register_task("request_operations", self.request_operations, interval=settings.request_interval)
self.register_task("clear_requests", self.requests.clear_requests, interval=CLEAR_ALL_REQUESTS_INTERVAL)
self.logger.info('Knowledge community initialized')

Expand Down
Loading

0 comments on commit 35d81ca

Please sign in to comment.