Skip to content

Commit

Permalink
Added query feature to bandwidth accounting
Browse files Browse the repository at this point in the history
  • Loading branch information
devos50 committed Oct 28, 2020
1 parent ada27e4 commit 7984adf
Show file tree
Hide file tree
Showing 9 changed files with 283 additions and 49 deletions.
15 changes: 15 additions & 0 deletions build/systemd/bandwidth-crawler.service
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[Unit]
Description="Bandwidth crawler"
After=network-online.target

[Service]
Type=simple
User=crawler
Group=crawler
Restart=always
Environment=PYTHONPATH=./src/pyipv8:./src/tribler-common:./src/tribler-core
WorkingDirectory=/opt/tribler
ExecStart=/usr/bin/python3 src/tribler-core/run_bandwidth_crawler.py --statedir /var/lib/crawler $EXTRA_CRAWLER_ARGS

[Install]
WantedBy=multi-user.target
71 changes: 71 additions & 0 deletions src/tribler-core/run_bandwidth_crawler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
"""
This executable script starts a Tribler instance and joins the BandwidthAccountingCommunity.
"""
import argparse
import sys
from asyncio import ensure_future, get_event_loop
from pathlib import Path

from ipv8.loader import IPv8CommunityLoader

from tribler_core.config.tribler_config import TriblerConfig
from tribler_core.modules.bandwidth_accounting.database import BandwidthDatabase
from tribler_core.modules.bandwidth_accounting.settings import BandwidthAccountingSettings
from tribler_core.modules.ipv8_module_catalog import BandwidthCommunityLauncher
from tribler_core.session import Session


class PortAction(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
if not 0 < values < 2**16:
raise argparse.ArgumentError(self, "Invalid port number")
setattr(namespace, self.dest, values)


class BandwidthCommunityCrawlerLauncher(BandwidthCommunityLauncher):

def get_kwargs(self, session):
settings = BandwidthAccountingSettings()
settings.outgoing_query_interval = 5
database = BandwidthDatabase(session.config.get_state_dir() / "sqlite" / "bandwidth.db",
session.trustchain_keypair.pub().key_to_bin(), store_all_transactions=True)

return {
"database": database,
"settings": settings,
"max_peers": -1
}


async def start_crawler(config):
session = Session(config)

# We use our own community loader
loader = IPv8CommunityLoader()
session.ipv8_community_loader = loader
loader.set_launcher(BandwidthCommunityCrawlerLauncher())

await session.start()


if __name__ == "__main__":
parser = argparse.ArgumentParser(description=('Start a crawler in the bandwidth accounting community'))
parser.add_argument('--statedir', '-s', default='bw_crawler', type=str, help='Use an alternate statedir')
parser.add_argument('--restapi', '-p', default=8085, type=str, help='Use an alternate port for the REST API',
action=PortAction, metavar='{0..65535}')
args = parser.parse_args(sys.argv[1:])

config = TriblerConfig(args.statedir, config_file=Path(args.statedir) / 'triblerd.conf')
config.set_state_dir(Path(args.statedir).absolute())
config.set_tunnel_community_enabled(False)
config.set_libtorrent_enabled(False)
config.set_bootstrap_enabled(False)
config.set_chant_enabled(False)
config.set_torrent_checking_enabled(False)
config.set_api_http_enabled(True)
config.set_api_http_port(args.restapi)

loop = get_event_loop()
coro = start_crawler(config)
ensure_future(coro)
loop.run_forever()
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from asyncio import Future
from binascii import unhexlify
from pathlib import Path
from random import choice
from typing import Dict

from ipv8.community import Community
Expand All @@ -15,7 +16,9 @@
from tribler_core.modules.bandwidth_accounting import EMPTY_SIGNATURE
from tribler_core.modules.bandwidth_accounting.cache import BandwidthTransactionSignCache
from tribler_core.modules.bandwidth_accounting.database import BandwidthDatabase
from tribler_core.modules.bandwidth_accounting.payload import BandwidthTransactionPayload
from tribler_core.modules.bandwidth_accounting.payload import BandwidthTransactionPayload, \
BandwidthTransactionQueryPayload
from tribler_core.modules.bandwidth_accounting.settings import BandwidthAccountingSettings
from tribler_core.modules.bandwidth_accounting.transaction import BandwidthTransactionData
from tribler_core.utilities.unicode import hexlify

Expand All @@ -34,6 +37,7 @@ def __init__(self, *args, **kwargs) -> None:
:param persistence: The database that stores transactions, will be created if not provided.
:param database_path: The path at which the database will be created. Defaults to the current working directory.
"""
self.settings = kwargs.pop('settings', BandwidthAccountingSettings())
self.database = kwargs.pop('database', None)
self.database_path = Path(kwargs.pop('database_path', ''))

Expand All @@ -46,6 +50,9 @@ def __init__(self, *args, **kwargs) -> None:
self.database = BandwidthDatabase(self.database_path, self.my_pk)

self.add_message_handler(BandwidthTransactionPayload, self.received_transaction)
self.add_message_handler(BandwidthTransactionQueryPayload, self.received_query)

self.register_task("query_peers", self.query_random_peer, interval=self.settings.outgoing_query_interval)

self.logger.info("Started bandwidth accounting community with public key %s", hexlify(self.my_pk))

Expand Down Expand Up @@ -75,22 +82,22 @@ def do_payout(self, peer: Peer, amount: int) -> Future:
with db_session:
self.database.BandwidthTransaction.insert(tx)
cache = self.request_cache.add(BandwidthTransactionSignCache(self, tx))
self.send_transaction(tx, peer, cache.number)
self.send_transaction(tx, peer.address, cache.number)

return cache.future

def send_transaction(self, transaction: BandwidthTransactionData, peer: Peer, request_id: int) -> None:
def send_transaction(self, transaction: BandwidthTransactionData, address: Address, request_id: int) -> None:
"""
Send a provided transaction to another party.
:param transaction: The BandwidthTransaction to send to the other party.
:param peer: The peer that will receive the transaction.
:param peer: The IP address and port of the peer.
:param request_id: The identifier of the message, is usually provided by a request cache.
"""
payload = BandwidthTransactionPayload.from_transaction(transaction, request_id)
packet = self._ez_pack(self._prefix, 1, [payload], False)
self.endpoint.send(peer.address, packet)
self.endpoint.send(address, packet)

async def received_transaction(self, source_address: Address, data: bytes) -> None:
def received_transaction(self, source_address: Address, data: bytes) -> None:
"""
Callback when we receive a transaction from another peer.
:param source_address: The network address of the peer that has sent us the transaction.
Expand All @@ -103,38 +110,73 @@ async def received_transaction(self, source_address: Address, data: bytes) -> No
self.logger.info("Transaction %s not valid, ignoring it", tx)
return

latest_tx = self.database.get_latest_transaction(tx.public_key_a, tx.public_key_b)

if payload.public_key_b == self.my_peer.public_key.key_to_bin():
from_peer = Peer(payload.public_key_a, source_address)
if latest_tx:
# Check if the amount in the received transaction is higher than the amount of the latest one
# in the database.
if payload.amount > latest_tx.amount:
# Sign it, store it, and send it back
if payload.public_key_a == self.my_pk or payload.public_key_b == self.my_pk:
# This transaction involves this peer.
latest_tx = self.database.get_latest_transaction(tx.public_key_a, tx.public_key_b)
if payload.public_key_b == self.my_peer.public_key.key_to_bin():
from_peer = Peer(payload.public_key_a, source_address)
if latest_tx:
# Check if the amount in the received transaction is higher than the amount of the latest one
# in the database.
if payload.amount > latest_tx.amount:
# Sign it, store it, and send it back
tx.sign(self.my_peer.key, as_a=False)
self.database.BandwidthTransaction.insert(tx)
self.send_transaction(tx, from_peer.address, payload.request_id)
else:
self.logger.info("Received older bandwidth transaction - sending back the latest one")
self.send_transaction(latest_tx, from_peer.address, payload.request_id)
else:
# This transaction is the first one with party A. Sign it, store it, and send it back.
tx.sign(self.my_peer.key, as_a=False)
self.database.BandwidthTransaction.insert(tx)
self.send_transaction(tx, from_peer, payload.request_id)
else:
self.logger.info("Received older bandwidth transaction - sending back the latest one")
self.send_transaction(latest_tx, from_peer, payload.request_id)
else:
# This transaction is the first one with party A. Sign it, store it, and send it back.
tx.sign(self.my_peer.key, as_a=False)
self.database.BandwidthTransaction.insert(tx)
from_peer = Peer(payload.public_key_a, source_address)
self.send_transaction(tx, from_peer, payload.request_id)
elif payload.public_key_a == self.my_peer.public_key.key_to_bin():
# It seems that we initiated this transaction. Check if we are waiting for it.
cache = self.request_cache.get("bandwidth-tx-sign", payload.request_id)
if not cache:
self.logger.info("Received bandwidth transaction %s without associated cache entry, ignoring it", tx)
return

if not latest_tx or (latest_tx and latest_tx.amount >= tx.amount):
self.database.BandwidthTransaction.insert(tx)

cache.future.set_result(tx)
from_peer = Peer(payload.public_key_a, source_address)
self.send_transaction(tx, from_peer.address, payload.request_id)
elif payload.public_key_a == self.my_peer.public_key.key_to_bin():
# It seems that we initiated this transaction. Check if we are waiting for it.
cache = self.request_cache.get("bandwidth-tx-sign", payload.request_id)
if not cache:
self.logger.info("Received bandwidth transaction %s without associated cache entry, ignoring it",
tx)
return

if not latest_tx or (latest_tx and latest_tx.amount >= tx.amount):
self.database.BandwidthTransaction.insert(tx)

cache.future.set_result(tx)
else:
# This transaction involves two unknown peers. We can add it to our database.
self.database.BandwidthTransaction.insert(tx)

def query_random_peer(self) -> None:
"""
Query a random peer neighbouring peer and ask their bandwidth transactions.
"""
peers = list(self.network.verified_peers)
random_peer = choice(peers)
if random_peer:
self.query_transactions(random_peer)

def query_transactions(self, peer: Peer) -> None:
"""
Query the transactions of a specific peer and ask for their bandwidth transactions.
:param peer: The peer to send the query to.
"""
self.logger.info("Querying the transactions of peer %s:%d", *peer.address)
payload = BandwidthTransactionQueryPayload()
packet = self._ez_pack(self._prefix, 2, [payload], False)
self.endpoint.send(peer.address, packet)

def received_query(self, source_address: Address, data: bytes) -> None:
"""
We received a query from another peer.
:param source_address: The network address of the peer that has sent us the query.
:param data: The serialized, raw data in the packet.
"""
my_txs = self.database.get_my_latest_transactions(limit=self.settings.max_tx_returned_in_query)
self.logger.debug("Sending %d bandwidth transaction(s) to peer %s:%d", len(my_txs), *source_address)
for tx in my_txs:
self.send_transaction(tx, source_address, 0)

def get_statistics(self) -> Dict:
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from pathlib import Path
from typing import List
from typing import List, Optional

from pony.orm import Database, count, db_session, select, sum

Expand All @@ -14,14 +14,17 @@ class BandwidthDatabase:
CURRENT_DB_VERSION = 8
MAX_HISTORY_ITEMS = 100 # The maximum number of history items to store.

def __init__(self, db_path: Path, my_pub_key: bytes) -> None:
def __init__(self, db_path: Path, my_pub_key: bytes, store_all_transactions: bool = False) -> None:
"""
Sets up the persistence layer ready for use.
:param db_path: The full path of the database.
:param my_pub_key: The public key of the user operating the database.
:param store_all_transactions: Whether we store all pairwise transactions in the database. This is disabled by
default and used for data collection purposes.
"""
self.db_path = db_path
self.my_pub_key = my_pub_key
self.store_all_transactions = store_all_transactions
create_db = str(db_path) == ":memory:" or not self.db_path.is_file()
self.database = Database()

Expand All @@ -36,6 +39,32 @@ def __init__(self, db_path: Path, my_pub_key: bytes) -> None:
with db_session:
self.MiscData(name="db_version", value=str(self.CURRENT_DB_VERSION))

@db_session
def has_transaction(self, transaction: BandwidthTransactionData) -> bool:
"""
Return whether a transaction is persisted to the database.
:param transaction: The transaction to check.
:return: A boolean value, indicating whether we have the transaction in the database or not.
"""
return self.BandwidthTransaction.exists(public_key_a=transaction.public_key_a,
public_key_b=transaction.public_key_b,
sequence_number=transaction.sequence_number)

@db_session
def get_my_latest_transactions(self, limit: Optional[int] = None) -> List[BandwidthTransactionData]:
"""
Return all latest transactions involving you.
:param limit: An optional integer, to limit the number of results returned. Pass None to get all results.
:return A list containing all latest transactions involving you.
"""
results = []
db_txs = select(tx for tx in self.BandwidthTransaction
if tx.public_key_a == self.my_pub_key or tx.public_key_b == self.my_pub_key)\
.limit(limit)
for db_tx in db_txs:
results.append(BandwidthTransactionData.from_db(db_tx))
return results

@db_session
def get_latest_transaction(self, public_key_a: bytes, public_key_b: bytes) -> BandwidthTransactionData:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,8 @@ def from_transaction(cls, transaction: BandwidthTransaction, request_id: int) ->
transaction.timestamp,
request_id
)


@vp_compile
class BandwidthTransactionQueryPayload(VariablePayload):
msg_id = 2
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from dataclasses import dataclass


@dataclass
class BandwidthAccountingSettings:
"""
This class contains several settings related to the bandwidth accounting mechanism.
"""
outgoing_query_interval: int = 30 # The interval at which we send out queries to other peers, in seconds.
max_tx_returned_in_query: int = 10 # The maximum number of bandwidth transactions to return in response to a query.
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,17 @@ def insert(cls, transaction: BandwidthTransaction) -> None:
Remove the last transaction with that specific counterparty while doing so.
:param transaction: The transaction to insert in the database.
"""
for tx in cls.select(
lambda c: c.public_key_a == transaction.public_key_a and
c.public_key_b == transaction.public_key_b):
tx.delete()
db.commit()
cls(**transaction.get_db_kwargs())
if not bandwidth_database.store_all_transactions:
# Make sure to only store the latest pairwise transaction.
for tx in cls.select(
lambda c: c.public_key_a == transaction.public_key_a and
c.public_key_b == transaction.public_key_b):
tx.delete()
db.commit()
cls(**transaction.get_db_kwargs())
elif not bandwidth_database.has_transaction(transaction):
# We store all transactions and it does not exist yet - insert it.
cls(**transaction.get_db_kwargs())

if transaction.public_key_a == bandwidth_database.my_pub_key or \
transaction.public_key_b == bandwidth_database.my_pub_key:
Expand Down
Loading

0 comments on commit 7984adf

Please sign in to comment.