Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added basic query functionality to bandwidth accounting #5689

Merged
merged 1 commit into from
Nov 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions build/systemd/bandwidth-crawler.service
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[Unit]
Description="Bandwidth crawler"
After=network-online.target

[Service]
Type=simple
User=crawler
Group=crawler
Restart=always
Environment=PYTHONPATH=./src/pyipv8:./src/tribler-common:./src/tribler-core
WorkingDirectory=/opt/tribler
ExecStart=/usr/bin/python3 src/tribler-core/run_bandwidth_crawler.py --statedir /var/lib/crawler $EXTRA_CRAWLER_ARGS

[Install]
WantedBy=multi-user.target
71 changes: 71 additions & 0 deletions src/tribler-core/run_bandwidth_crawler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
"""
This executable script starts a Tribler instance and joins the BandwidthAccountingCommunity.
"""
import argparse
import sys
from asyncio import ensure_future, get_event_loop
from pathlib import Path

from ipv8.loader import IPv8CommunityLoader

from tribler_core.config.tribler_config import TriblerConfig
from tribler_core.modules.bandwidth_accounting.database import BandwidthDatabase
from tribler_core.modules.bandwidth_accounting.settings import BandwidthAccountingSettings
from tribler_core.modules.ipv8_module_catalog import BandwidthCommunityLauncher
from tribler_core.session import Session


class PortAction(argparse.Action):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 for argparse usage!

def __call__(self, _, namespace, values, option_string=None):
if not 0 < values < 2**16:
raise argparse.ArgumentError(self, "Invalid port number")
setattr(namespace, self.dest, values)


class BandwidthCommunityCrawlerLauncher(BandwidthCommunityLauncher):

def get_kwargs(self, session):
settings = BandwidthAccountingSettings()
settings.outgoing_query_interval = 5
database = BandwidthDatabase(session.config.get_state_dir() / "sqlite" / "bandwidth.db",
session.trustchain_keypair.pub().key_to_bin(), store_all_transactions=True)

return {
"database": database,
"settings": settings,
"max_peers": -1
}


async def start_crawler(tribler_config):
session = Session(tribler_config)

# We use our own community loader
loader = IPv8CommunityLoader()
session.ipv8_community_loader = loader
loader.set_launcher(BandwidthCommunityCrawlerLauncher())

await session.start()


if __name__ == "__main__":
parser = argparse.ArgumentParser(description=('Start a crawler in the bandwidth accounting community'))
parser.add_argument('--statedir', '-s', default='bw_crawler', type=str, help='Use an alternate statedir')
parser.add_argument('--restapi', '-p', default=8085, type=str, help='Use an alternate port for the REST API',
action=PortAction, metavar='{0..65535}')
args = parser.parse_args(sys.argv[1:])

config = TriblerConfig(args.statedir, config_file=Path(args.statedir) / 'triblerd.conf')
config.set_state_dir(Path(args.statedir).absolute())
config.set_tunnel_community_enabled(False)
config.set_libtorrent_enabled(False)
config.set_bootstrap_enabled(False)
config.set_chant_enabled(False)
config.set_torrent_checking_enabled(False)
config.set_api_http_enabled(True)
config.set_api_http_port(args.restapi)

loop = get_event_loop()
coro = start_crawler(config)
ensure_future(coro)
loop.run_forever()
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from asyncio import Future
from binascii import unhexlify
from pathlib import Path
from random import Random
from typing import Dict

from ipv8.community import Community
Expand All @@ -15,7 +16,9 @@
from tribler_core.modules.bandwidth_accounting import EMPTY_SIGNATURE
from tribler_core.modules.bandwidth_accounting.cache import BandwidthTransactionSignCache
from tribler_core.modules.bandwidth_accounting.database import BandwidthDatabase
from tribler_core.modules.bandwidth_accounting.payload import BandwidthTransactionPayload
from tribler_core.modules.bandwidth_accounting.payload import BandwidthTransactionPayload, \
BandwidthTransactionQueryPayload
from tribler_core.modules.bandwidth_accounting.settings import BandwidthAccountingSettings
from tribler_core.modules.bandwidth_accounting.transaction import BandwidthTransactionData
from tribler_core.utilities.unicode import hexlify

Expand All @@ -34,8 +37,10 @@ def __init__(self, *args, **kwargs) -> None:
:param persistence: The database that stores transactions, will be created if not provided.
:param database_path: The path at which the database will be created. Defaults to the current working directory.
"""
self.settings = kwargs.pop('settings', BandwidthAccountingSettings())
self.database = kwargs.pop('database', None)
self.database_path = Path(kwargs.pop('database_path', ''))
self.random = Random()

super().__init__(*args, **kwargs)

Expand All @@ -46,6 +51,9 @@ def __init__(self, *args, **kwargs) -> None:
self.database = BandwidthDatabase(self.database_path, self.my_pk)

self.add_message_handler(BandwidthTransactionPayload, self.received_transaction)
self.add_message_handler(BandwidthTransactionQueryPayload, self.received_query)

self.register_task("query_peers", self.query_random_peer, interval=self.settings.outgoing_query_interval)

self.logger.info("Started bandwidth accounting community with public key %s", hexlify(self.my_pk))

Expand Down Expand Up @@ -75,22 +83,22 @@ def do_payout(self, peer: Peer, amount: int) -> Future:
with db_session:
self.database.BandwidthTransaction.insert(tx)
cache = self.request_cache.add(BandwidthTransactionSignCache(self, tx))
self.send_transaction(tx, peer, cache.number)
self.send_transaction(tx, peer.address, cache.number)

return cache.future

def send_transaction(self, transaction: BandwidthTransactionData, peer: Peer, request_id: int) -> None:
def send_transaction(self, transaction: BandwidthTransactionData, address: Address, request_id: int) -> None:
"""
Send a provided transaction to another party.
:param transaction: The BandwidthTransaction to send to the other party.
:param peer: The peer that will receive the transaction.
:param peer: The IP address and port of the peer.
:param request_id: The identifier of the message, is usually provided by a request cache.
"""
payload = BandwidthTransactionPayload.from_transaction(transaction, request_id)
packet = self._ez_pack(self._prefix, 1, [payload], False)
self.endpoint.send(peer.address, packet)
self.endpoint.send(address, packet)

async def received_transaction(self, source_address: Address, data: bytes) -> None:
def received_transaction(self, source_address: Address, data: bytes) -> None:
devos50 marked this conversation as resolved.
Show resolved Hide resolved
"""
Callback when we receive a transaction from another peer.
:param source_address: The network address of the peer that has sent us the transaction.
Expand All @@ -103,38 +111,73 @@ async def received_transaction(self, source_address: Address, data: bytes) -> No
self.logger.info("Transaction %s not valid, ignoring it", tx)
return

latest_tx = self.database.get_latest_transaction(tx.public_key_a, tx.public_key_b)

if payload.public_key_b == self.my_peer.public_key.key_to_bin():
from_peer = Peer(payload.public_key_a, source_address)
if latest_tx:
# Check if the amount in the received transaction is higher than the amount of the latest one
# in the database.
if payload.amount > latest_tx.amount:
# Sign it, store it, and send it back
if payload.public_key_a == self.my_pk or payload.public_key_b == self.my_pk:
# This transaction involves this peer.
latest_tx = self.database.get_latest_transaction(tx.public_key_a, tx.public_key_b)
if payload.public_key_b == self.my_peer.public_key.key_to_bin():
from_peer = Peer(payload.public_key_a, source_address)
if latest_tx:
# Check if the amount in the received transaction is higher than the amount of the latest one
# in the database.
if payload.amount > latest_tx.amount:
# Sign it, store it, and send it back
tx.sign(self.my_peer.key, as_a=False)
self.database.BandwidthTransaction.insert(tx)
self.send_transaction(tx, from_peer.address, payload.request_id)
else:
self.logger.info("Received older bandwidth transaction - sending back the latest one")
self.send_transaction(latest_tx, from_peer.address, payload.request_id)
else:
# This transaction is the first one with party A. Sign it, store it, and send it back.
tx.sign(self.my_peer.key, as_a=False)
self.database.BandwidthTransaction.insert(tx)
self.send_transaction(tx, from_peer, payload.request_id)
else:
self.logger.info("Received older bandwidth transaction - sending back the latest one")
self.send_transaction(latest_tx, from_peer, payload.request_id)
else:
# This transaction is the first one with party A. Sign it, store it, and send it back.
tx.sign(self.my_peer.key, as_a=False)
self.database.BandwidthTransaction.insert(tx)
from_peer = Peer(payload.public_key_a, source_address)
self.send_transaction(tx, from_peer, payload.request_id)
elif payload.public_key_a == self.my_peer.public_key.key_to_bin():
# It seems that we initiated this transaction. Check if we are waiting for it.
cache = self.request_cache.get("bandwidth-tx-sign", payload.request_id)
if not cache:
self.logger.info("Received bandwidth transaction %s without associated cache entry, ignoring it", tx)
return

if not latest_tx or (latest_tx and latest_tx.amount >= tx.amount):
self.database.BandwidthTransaction.insert(tx)

cache.future.set_result(tx)
from_peer = Peer(payload.public_key_a, source_address)
self.send_transaction(tx, from_peer.address, payload.request_id)
elif payload.public_key_a == self.my_peer.public_key.key_to_bin():
# It seems that we initiated this transaction. Check if we are waiting for it.
cache = self.request_cache.get("bandwidth-tx-sign", payload.request_id)
if not cache:
self.logger.info("Received bandwidth transaction %s without associated cache entry, ignoring it",
tx)
return

if not latest_tx or (latest_tx and latest_tx.amount >= tx.amount):
self.database.BandwidthTransaction.insert(tx)

cache.future.set_result(tx)
else:
# This transaction involves two unknown peers. We can add it to our database.
self.database.BandwidthTransaction.insert(tx)

def query_random_peer(self) -> None:
"""
Query a random peer neighbouring peer and ask their bandwidth transactions.
"""
peers = list(self.network.verified_peers)
random_peer = self.random.choice(peers)
if random_peer:
self.query_transactions(random_peer)

def query_transactions(self, peer: Peer) -> None:
"""
Query the transactions of a specific peer and ask for their bandwidth transactions.
:param peer: The peer to send the query to.
"""
self.logger.info("Querying the transactions of peer %s:%d", *peer.address)
payload = BandwidthTransactionQueryPayload()
packet = self._ez_pack(self._prefix, 2, [payload], False)
self.endpoint.send(peer.address, packet)

def received_query(self, source_address: Address, data: bytes) -> None:
"""
We received a query from another peer.
:param source_address: The network address of the peer that has sent us the query.
:param data: The serialized, raw data in the packet.
"""
my_txs = self.database.get_my_latest_transactions(limit=self.settings.max_tx_returned_in_query)
self.logger.debug("Sending %d bandwidth transaction(s) to peer %s:%d", len(my_txs), *source_address)
for tx in my_txs:
self.send_transaction(tx, source_address, 0)

def get_statistics(self) -> Dict:
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from pathlib import Path
from typing import List
from typing import List, Optional

from pony.orm import Database, count, db_session, select, sum

from tribler_core.modules.bandwidth_accounting import history, misc, transaction
from tribler_core.modules.bandwidth_accounting import history, misc, transaction as db_transaction
from tribler_core.modules.bandwidth_accounting.transaction import BandwidthTransactionData


Expand All @@ -14,14 +14,17 @@ class BandwidthDatabase:
CURRENT_DB_VERSION = 8
MAX_HISTORY_ITEMS = 100 # The maximum number of history items to store.

def __init__(self, db_path: Path, my_pub_key: bytes) -> None:
def __init__(self, db_path: Path, my_pub_key: bytes, store_all_transactions: bool = False) -> None:
"""
Sets up the persistence layer ready for use.
:param db_path: The full path of the database.
:param my_pub_key: The public key of the user operating the database.
:param store_all_transactions: Whether we store all pairwise transactions in the database. This is disabled by
default and used for data collection purposes.
"""
self.db_path = db_path
self.my_pub_key = my_pub_key
self.store_all_transactions = store_all_transactions
create_db = str(db_path) == ":memory:" or not self.db_path.is_file()
self.database = Database()

Expand All @@ -37,7 +40,7 @@ def sqlite_disable_sync(_, connection):
# pylint: enable=unused-variable

self.MiscData = misc.define_binding(self.database)
self.BandwidthTransaction = transaction.define_binding(self)
self.BandwidthTransaction = db_transaction.define_binding(self)
self.BandwidthHistory = history.define_binding(self)

self.database.bind(provider='sqlite', filename=str(db_path), create_db=create_db, timeout=120.0)
Expand All @@ -47,6 +50,32 @@ def sqlite_disable_sync(_, connection):
with db_session:
self.MiscData(name="db_version", value=str(self.CURRENT_DB_VERSION))

@db_session
def has_transaction(self, transaction: BandwidthTransactionData) -> bool:
"""
Return whether a transaction is persisted to the database.
:param transaction: The transaction to check.
:return: A boolean value, indicating whether we have the transaction in the database or not.
"""
return self.BandwidthTransaction.exists(public_key_a=transaction.public_key_a,
public_key_b=transaction.public_key_b,
sequence_number=transaction.sequence_number)

@db_session
def get_my_latest_transactions(self, limit: Optional[int] = None) -> List[BandwidthTransactionData]:
"""
Return all latest transactions involving you.
:param limit: An optional integer, to limit the number of results returned. Pass None to get all results.
:return A list containing all latest transactions involving you.
"""
results = []
db_txs = select(tx for tx in self.BandwidthTransaction
if tx.public_key_a == self.my_pub_key or tx.public_key_b == self.my_pub_key)\
.limit(limit)
for db_tx in db_txs:
results.append(BandwidthTransactionData.from_db(db_tx))
return results

@db_session
def get_latest_transaction(self, public_key_a: bytes, public_key_b: bytes) -> BandwidthTransactionData:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,11 @@ def from_transaction(cls, transaction: BandwidthTransaction, request_id: int) ->
transaction.timestamp,
request_id
)


@vp_compile
class BandwidthTransactionQueryPayload(VariablePayload):
"""
(empty) payload for an outgoing query to fetch transactions by the counterparty.
"""
msg_id = 2
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from dataclasses import dataclass


@dataclass
class BandwidthAccountingSettings:
"""
This class contains several settings related to the bandwidth accounting mechanism.
"""
outgoing_query_interval: int = 30 # The interval at which we send out queries to other peers, in seconds.
max_tx_returned_in_query: int = 10 # The maximum number of bandwidth transactions to return in response to a query.
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,17 @@ def insert(cls, transaction: BandwidthTransaction) -> None:
Remove the last transaction with that specific counterparty while doing so.
:param transaction: The transaction to insert in the database.
"""
for tx in cls.select(
lambda c: c.public_key_a == transaction.public_key_a and
c.public_key_b == transaction.public_key_b):
tx.delete()
db.commit()
cls(**transaction.get_db_kwargs())
if not bandwidth_database.store_all_transactions:
# Make sure to only store the latest pairwise transaction.
for tx in cls.select(
lambda c: c.public_key_a == transaction.public_key_a and
c.public_key_b == transaction.public_key_b):
tx.delete()
db.commit()
cls(**transaction.get_db_kwargs())
elif not bandwidth_database.has_transaction(transaction):
# We store all transactions and it does not exist yet - insert it.
cls(**transaction.get_db_kwargs())

if transaction.public_key_a == bandwidth_database.my_pub_key or \
transaction.public_key_b == bandwidth_database.my_pub_key:
Expand Down
Loading