Skip to content

Commit

Permalink
Ported communities to CommunitySettings
Browse files Browse the repository at this point in the history
  • Loading branch information
qstokkink committed Oct 4, 2023
1 parent 09ed739 commit c17e726
Show file tree
Hide file tree
Showing 12 changed files with 170 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,33 @@
)
from tribler.core.components.bandwidth_accounting.db.database import BandwidthDatabase
from tribler.core.components.bandwidth_accounting.db.transaction import BandwidthTransactionData, EMPTY_SIGNATURE
from tribler.core.components.ipv8.tribler_community import TriblerCommunity
from tribler.core.components.ipv8.tribler_community import TriblerCommunity, TriblerSettings
from tribler.core.utilities.unicode import hexlify


class BandwidthCommunitySettings(TriblerSettings):
database: BandwidthDatabase | None = None


class BandwidthAccountingCommunity(TriblerCommunity):
"""
Community around bandwidth accounting and payouts.
"""
community_id = unhexlify('79b25f2867739261780faefede8f25038de9975d')
DB_NAME = 'bandwidth'
version = b'\x02'
settings_class = BandwidthCommunitySettings

def __init__(self, *args, **kwargs) -> None:
def __init__(self, settings: BandwidthCommunitySettings) -> None:
"""
Initialize the community.
:param persistence: The database that stores transactions, will be created if not provided.
:param database_path: The path at which the database will be created. Defaults to the current working directory.
"""
self.database: BandwidthDatabase = kwargs.pop('database', None)
self.database = settings.database
self.random = Random()

super().__init__(*args, **kwargs)
super().__init__(settings)

self.request_cache = RequestCache()
self.my_pk = self.my_peer.public_key.key_to_bin()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from __future__ import annotations

from ipv8.keyvault.crypto import default_eccrypto
from ipv8.peer import Peer
from ipv8.test.base import TestBase
from ipv8.test.mocking.ipv8 import MockIPv8

from tribler.core.components.bandwidth_accounting.community.bandwidth_accounting_community import (
BandwidthAccountingCommunity,
BandwidthAccountingCommunity, BandwidthCommunitySettings,
)
from tribler.core.components.bandwidth_accounting.community.cache import BandwidthTransactionSignCache
from tribler.core.components.bandwidth_accounting.db.database import BandwidthDatabase
Expand All @@ -21,11 +23,17 @@ def setUp(self):
super().setUp()
self.initialize(BandwidthAccountingCommunity, 2)

def create_node(self):
def create_node(self, settings: BandwidthCommunitySettings | None = None,
create_dht: bool = False, enable_statistics: bool = False):
peer = Peer(default_eccrypto.generate_key("curve25519"), address=("1.2.3.4", 5))
db = BandwidthDatabase(db_path=MEMORY_DB, my_pub_key=peer.public_key.key_to_bin())
ipv8 = MockIPv8(peer, BandwidthAccountingCommunity, database=db,
settings=BandwidthAccountingSettings())
db = BandwidthDatabase(db_path=MEMORY_DB,
my_pub_key=peer.public_key.key_to_bin())
ipv8 = MockIPv8(peer,
BandwidthAccountingCommunity,
BandwidthCommunitySettings(
database=db,
settings=BandwidthAccountingSettings()
))
return ipv8

def database(self, i):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import time
import uuid
from binascii import unhexlify
Expand All @@ -13,7 +15,8 @@
from tribler.core.components.ipv8.discovery_booster import DiscoveryBooster
from tribler.core.components.metadata_store.db.serialization import CHANNEL_TORRENT
from tribler.core.components.metadata_store.remote_query_community.payload_checker import ObjState
from tribler.core.components.metadata_store.remote_query_community.remote_query_community import RemoteQueryCommunity
from tribler.core.components.metadata_store.remote_query_community.remote_query_community import \
RemoteCommunitySettings, RemoteQueryCommunity
from tribler.core.components.metadata_store.utils import NoChannelSourcesException
from tribler.core.utilities.notifier import Notifier
from tribler.core.utilities.simpledefs import CHANNELS_VIEW_UUID
Expand Down Expand Up @@ -70,8 +73,13 @@ def get_last_seen_peers_for_channel(self, channel_pk: bytes, channel_id: int, li
return sorted(channel_peers, key=lambda x: x.last_response, reverse=True)[0:limit]


class GigaCommunitySettings(RemoteCommunitySettings):
notifier: Notifier | None = None


class GigaChannelCommunity(RemoteQueryCommunity):
community_id = unhexlify('d3512d0ff816d8ac672eab29a9c1a3a32e17cb13')
settings_class = GigaCommunitySettings

def create_introduction_response(
self,
Expand All @@ -94,14 +102,12 @@ def create_introduction_response(
new_style=new_style,
)

def __init__(
self, *args, notifier: Notifier = None, **kwargs
): # pylint: disable=unused-argument
def __init__(self, settings: GigaCommunitySettings): # pylint: disable=unused-argument
# ACHTUNG! We create a separate instance of Network for this community because it
# walks aggressively and wants lots of peers, which can interfere with other communities
super().__init__(*args, **kwargs)
super().__init__(settings)

self.notifier = notifier
self.notifier = settings.notifier

# This set contains all the peers that we queried for subscribed channels over time.
# It is emptied regularly. The purpose of this set is to work as a filter so we never query the same
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
from __future__ import annotations

import time
from collections.abc import Mapping
from dataclasses import asdict, dataclass, fields
from typing import Callable
from unittest.mock import AsyncMock, Mock

import pytest

from ipv8.community import CommunitySettings
from ipv8.keyvault.crypto import default_eccrypto
from ipv8.peer import Peer
from ipv8.test.base import TestBase
Expand All @@ -13,7 +17,7 @@
from tribler.core.components.gigachannel.community.gigachannel_community import (
ChannelsPeersMapping,
GigaChannelCommunity,
NoChannelSourcesException,
GigaCommunitySettings, NoChannelSourcesException,
happy_eyeballs_delay
)
from tribler.core.components.gigachannel.community.settings import ChantSettings
Expand Down Expand Up @@ -66,18 +70,20 @@ async def tearDown(self):
metadata_store.shutdown()
await super().tearDown()

def create_node(self, *args, **kwargs):
def create_node(self, settings: CommunitySettings | None = None, create_dht: bool = False,
enable_statistics: bool = False):
metadata_store = MetadataStore(
Path(self.temporary_directory()) / f"{self.count}.db",
Path(self.temporary_directory()),
default_eccrypto.generate_key("curve25519"),
disable_sync=True,
)
self.metadata_store_set.add(metadata_store)
kwargs['metadata_store'] = metadata_store
kwargs['settings'] = ChantSettings()
kwargs['rqc_settings'] = RemoteQueryCommunitySettings()
node = super().create_node(*args, **kwargs)
node = super().create_node(GigaCommunitySettings(
metadata_store=metadata_store,
settings=ChantSettings(),
rqc_settings=RemoteQueryCommunitySettings()
))

node.overlay.discovery_booster.finish()
notifier = Notifier(loop=self.loop)
Expand Down
17 changes: 14 additions & 3 deletions src/tribler/core/components/ipv8/ipv8_component.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Optional

from ipv8.bootstrapping.dispersy.bootstrapper import DispersyBootstrapper
from ipv8.community import CommunitySettings
from ipv8.configuration import ConfigBuilder, DISPERSY_BOOTSTRAPPER
from ipv8.dht.churn import PingChurn
from ipv8.dht.discovery import DHTDiscoveryCommunity
Expand Down Expand Up @@ -112,15 +113,25 @@ def make_bootstrapper(self) -> DispersyBootstrapper:

def _init_peer_discovery_community(self):
ipv8 = self.ipv8
community = DiscoveryCommunity(self.peer, ipv8.endpoint, ipv8.network, max_peers=100)
community = DiscoveryCommunity(CommunitySettings(
my_peer=self.peer,
endpoint=ipv8.endpoint,
network=ipv8.network,
max_peers=100
))
self.initialise_community_by_default(community)
ipv8.add_strategy(community, RandomChurn(community), INFINITE)
ipv8.add_strategy(community, PeriodicSimilarity(community), INFINITE)
self._peer_discovery_community = community

def _init_dht_discovery_community(self):
ipv8 = self.ipv8
community = DHTDiscoveryCommunity(self.peer, ipv8.endpoint, ipv8.network, max_peers=60)
community = DHTDiscoveryCommunity(CommunitySettings(
my_peer=self.peer,
endpoint=ipv8.endpoint,
network=ipv8.network,
max_peers=60
))
self.initialise_community_by_default(community)
ipv8.add_strategy(community, PingChurn(community), INFINITE)
self.dht_discovery_community = community
Expand All @@ -136,4 +147,4 @@ async def shutdown(self):
await self.ipv8.unload_overlay(overlay)

await self._task_manager.shutdown_task_manager()
await self.ipv8.stop(stop_loop=False)
await self.ipv8.stop()
16 changes: 12 additions & 4 deletions src/tribler/core/components/ipv8/tribler_community.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
from ipv8.community import Community
from __future__ import annotations

from ipv8.community import Community, CommunitySettings

from tribler.core.config.tribler_config_section import TriblerConfigSection


class TriblerSettings(CommunitySettings):
settings: TriblerConfigSection | None = None


class TriblerCommunity(Community):
"""Base class for Tribler communities.
"""

def __init__(self, *args, settings: TriblerConfigSection = None, **kwargs):
super().__init__(*args, **kwargs)
self.settings = settings
settings_class = TriblerSettings

def __init__(self, settings: TriblerSettings):
super().__init__(settings)
self.settings = settings.settings
self.logger.info(f'Init. Settings: {settings}.')
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import json
import struct
import time
Expand All @@ -14,7 +16,7 @@

from tribler.core.components.ipv8.eva.protocol import EVAProtocol
from tribler.core.components.ipv8.eva.result import TransferResult
from tribler.core.components.ipv8.tribler_community import TriblerCommunity
from tribler.core.components.ipv8.tribler_community import TriblerCommunity, TriblerSettings
from tribler.core.components.knowledge.community.knowledge_validator import is_valid_resource
from tribler.core.components.database.db.tribler_database import ResourceType
from tribler.core.components.metadata_store.db.orm_bindings.channel_metadata import LZ4_EMPTY_ARCHIVE, entries_to_chunk
Expand Down Expand Up @@ -128,21 +130,24 @@ def on_timeout(self):
pass


class RemoteCommunitySettings(TriblerSettings):
rqc_settings: RemoteQueryCommunitySettings | None = None
metadata_store: MetadataStore | None = None
tribler_db = None


class RemoteQueryCommunity(TriblerCommunity):
"""
Community for general purpose SELECT-like queries into remote Channels database
"""
settings_class = RemoteCommunitySettings

def __init__(self, my_peer, endpoint, network,
rqc_settings: RemoteQueryCommunitySettings = None,
metadata_store=None,
tribler_db=None,
**kwargs):
super().__init__(my_peer, endpoint, network=network, **kwargs)
def __init__(self, settings: RemoteCommunitySettings):
super().__init__(settings)

self.rqc_settings = rqc_settings
self.mds: MetadataStore = metadata_store
self.tribler_db = tribler_db
self.rqc_settings = settings.rqc_settings
self.mds = settings.metadata_store
self.tribler_db = settings.tribler_db
# This object stores requests for "select" queries that we sent to other hosts.
# We keep track of peers we actually requested for data so people can't randomly push spam at us.
# Also, this keeps track of hosts we responded to. There is a possibility that
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import asyncio
import random
import string
Expand All @@ -9,6 +11,8 @@
from unittest.mock import Mock, patch

import pytest

from ipv8.community import CommunitySettings
from ipv8.keyvault.crypto import default_eccrypto
from ipv8.test.base import TestBase
from pony.orm import db_session
Expand All @@ -18,7 +22,7 @@
from tribler.core.components.metadata_store.db.serialization import CHANNEL_THUMBNAIL, CHANNEL_TORRENT, REGULAR_TORRENT
from tribler.core.components.metadata_store.db.store import MetadataStore
from tribler.core.components.metadata_store.remote_query_community.remote_query_community import (
RemoteQueryCommunity,
RemoteCommunitySettings, RemoteQueryCommunity,
sanitize_query,
)
from tribler.core.components.metadata_store.remote_query_community.settings import RemoteQueryCommunitySettings
Expand Down Expand Up @@ -73,17 +77,17 @@ async def tearDown(self):
metadata_store.shutdown()
await super().tearDown()

def create_node(self, *args, **kwargs):
def create_node(self, settings: CommunitySettings | None = None, create_dht: bool = False,
enable_statistics: bool = False):
metadata_store = MetadataStore(
Path(self.temporary_directory()) / f"{self.count}.db",
Path(self.temporary_directory()),
default_eccrypto.generate_key("curve25519"),
disable_sync=True,
)
self.metadata_store_set.add(metadata_store)
kwargs['metadata_store'] = metadata_store
kwargs['rqc_settings'] = RemoteQueryCommunitySettings()
node = super().create_node(*args, **kwargs)
node = super().create_node(RemoteCommunitySettings(metadata_store=metadata_store,
rqc_settings=RemoteQueryCommunitySettings()))
self.count += 1
return node

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from ipv8.lazy_community import lazy_wrapper
from pony.orm import db_session

from tribler.core.components.metadata_store.remote_query_community.remote_query_community import RemoteQueryCommunity
from tribler.core.components.metadata_store.remote_query_community.remote_query_community import \
RemoteCommunitySettings, RemoteQueryCommunity
from tribler.core.components.popularity.community.payload import PopularTorrentsRequest, TorrentsHealthPayload
from tribler.core.components.popularity.community.version_community_mixin import VersionCommunityMixin
from tribler.core.components.torrent_checker.torrent_checker.dataclasses import HealthInfo
Expand All @@ -19,6 +20,10 @@
from tribler.core.components.torrent_checker.torrent_checker.torrent_checker import TorrentChecker


class PopularityCommunitySettings(RemoteCommunitySettings):
torrent_checker: TorrentChecker | None = None


class PopularityCommunity(RemoteQueryCommunity, VersionCommunityMixin):
"""
Community for disseminating the content across the network.
Expand All @@ -36,11 +41,12 @@ class PopularityCommunity(RemoteQueryCommunity, VersionCommunityMixin):
GOSSIP_RANDOM_TORRENT_COUNT = 10

community_id = unhexlify('9aca62f878969c437da9844cba29a134917e1648')
settings_class = PopularityCommunitySettings

def __init__(self, *args, torrent_checker=None, **kwargs):
def __init__(self, settings: PopularityCommunitySettings):
# Creating a separate instance of Network for this community to find more peers
super().__init__(*args, **kwargs)
self.torrent_checker: TorrentChecker = torrent_checker
super().__init__(settings)
self.torrent_checker = settings.torrent_checker

self.add_message_handler(TorrentsHealthPayload, self.on_torrents_health)
self.add_message_handler(PopularTorrentsRequest, self.on_popular_torrents_request)
Expand Down
Loading

0 comments on commit c17e726

Please sign in to comment.