Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performing direct payouts to other Tribler clients #3776

Merged
merged 4 commits into from
Aug 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Tribler/Core/APIImplementation/LaunchManyCore.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from Tribler.Core.CacheDB.sqlitecachedb import forceDBThread
from Tribler.Core.DecentralizedTracking.dht_provider import MainlineDHTProvider
from Tribler.Core.DownloadConfig import DownloadStartupConfig, DefaultDownloadStartupConfig
from Tribler.Core.Modules.payout_manager import PayoutManager
from Tribler.Core.Modules.resource_monitor import ResourceMonitor
from Tribler.Core.Modules.search_manager import SearchManager
from Tribler.Core.Modules.versioncheck_manager import VersionCheckManager
Expand Down Expand Up @@ -109,6 +110,7 @@ def __init__(self):
self.credit_mining_manager = None
self.market_community = None
self.dht_community = None
self.payout_manager = None

def register(self, session, session_lock):
assert isInIOThread()
Expand Down Expand Up @@ -463,6 +465,9 @@ def init(self):
self.version_check_manager = VersionCheckManager(self.session)
self.session.set_download_states_callback(self.sesscb_states_callback)

if self.session.config.get_ipv8_enabled() and self.session.config.get_trustchain_enabled():
self.payout_manager = PayoutManager(self.trustchain_community, self.dht_community)

self.initComplete = True

def add(self, tdef, dscfg, pstate=None, setupDelay=0, hidden=False,
Expand Down Expand Up @@ -709,6 +714,13 @@ def sesscb_states_callback(self, states_list):
hops = self.session.config.get_default_number_hops()
self.update_download_hops(download, hops)

# Check the peers of this download every five seconds and add them to the payout manager when
# this peer runs a Tribler instance
if self.state_cb_count % 5 == 0 and download.get_hops() == 0 and self.payout_manager:
for peer in download.get_peerlist():
if peer["extended_version"].startswith('Tribler'):
self.payout_manager.update_peer(peer["id"].decode('hex'), infohash, peer["dtotal"])

self.previous_active_downloads = new_active_downloads
if do_checkpoint:
self.session.checkpoint_downloads()
Expand Down
11 changes: 9 additions & 2 deletions Tribler/Core/Libtorrent/LibtorrentMgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def __init__(self, tribler_session):

self.default_alert_mask = lt.alert.category_t.error_notification | lt.alert.category_t.status_notification | \
lt.alert.category_t.storage_notification | lt.alert.category_t.performance_warning | \
lt.alert.category_t.tracker_notification
lt.alert.category_t.tracker_notification | lt.alert.category_t.debug_notification
self.alert_callback = None

@blocking_call_on_reactor_thread
Expand Down Expand Up @@ -158,6 +158,10 @@ def create_session(self, hops=0, store_listen_port=True):
pe_settings = lt.pe_settings()
pe_settings.prefer_rc4 = True
ltsession.set_pe_settings(pe_settings)

mid = self.tribler_session.trustchain_keypair.key_to_hash()
settings['peer_fingerprint'] = mid
settings['handshake_client_version'] = 'Tribler/' + version_id + '/' + mid.encode('hex')
else:
settings['enable_outgoing_utp'] = True
settings['enable_incoming_utp'] = True
Expand Down Expand Up @@ -378,7 +382,6 @@ def process_alert(self, alert):
self._logger.debug("Got state_update %s for unknown torrent %s", alert_type, infohash)
continue
self.torrents[infohash][0].update_lt_status(status)
return

handle = getattr(alert, 'handle', None)
if handle and handle.is_valid():
Expand Down Expand Up @@ -410,6 +413,10 @@ def process_alert(self, alert):
else:
self._logger.debug("Removed alert for unknown torrent")

elif alert_type == 'peer_disconnected_alert' and \
self.tribler_session and self.tribler_session.lm.payout_manager:
self.tribler_session.lm.payout_manager.do_payout(alert.pid.to_string())

if self.alert_callback:
self.alert_callback(alert)

Expand Down
54 changes: 54 additions & 0 deletions Tribler/Core/Modules/payout_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import logging

from Tribler.Core.Modules.wallet.tc_wallet import TrustchainWallet


class PayoutManager(object):
"""
This manager is responsible for keeping track of known Tribler peers and doing (zero-hop) payouts.
"""

def __init__(self, trustchain, dht):
self.logger = logging.getLogger(self.__class__.__name__)
self.bandwidth_wallet = TrustchainWallet(trustchain)
self.dht = dht
self.tribler_peers = {}

def do_payout(self, mid):
"""
Perform a payout to a given mid. First, determine the outstanding balance. Then resolve the node in the DHT.
"""
if mid not in self.tribler_peers:
self.logger.warning("Mid %s not found in known peers, not doing payout!", mid.encode('hex'))
return

total_bytes = 0
for balance in self.tribler_peers[mid].itervalues():
total_bytes += balance

def on_nodes(nodes):
self.logger.debug("Received %d nodes for DHT lookup", len(nodes))
if nodes:
self.bandwidth_wallet.trustchain.sign_block(nodes[0],
public_key=nodes[0].public_key.key_to_bin(),
block_type='tribler_bandwidth',
transaction={'up': 0, 'down': total_bytes})

if total_bytes >= 1024 * 1024: # Do at least 1MB payouts
self.logger.info("Doing direct payout to %s (%d bytes)", mid.encode('hex'), total_bytes)
self.dht.connect_peer(mid).addCallback(on_nodes)

# Remove the outstanding bytes; otherwise we will payout again
self.tribler_peers.pop(mid, None)

def update_peer(self, mid, infohash, balance):
"""
Update a peer with a specific mid for a specific infohash.
"""
self.logger.debug("Updating peer with mid %s and ih %s (balance: %d)", mid.encode('hex'),
infohash.encode('hex'), balance)

if mid not in self.tribler_peers:
self.tribler_peers[mid] = {}

self.tribler_peers[mid][infohash] = balance
4 changes: 3 additions & 1 deletion Tribler/Core/Modules/wallet/bandwidth_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ class TriblerBandwidthBlock(TrustChainBlock):
"""

@classmethod
def create(cls, block_type, transaction, database, public_key, link=None, link_pk=None):
def create(cls, block_type, transaction, database, public_key, link=None, link_pk=None, additional_info=None):
"""
Create an empty next block.
:param block_type: the type of the block to be created
Expand All @@ -17,6 +17,8 @@ def create(cls, block_type, transaction, database, public_key, link=None, link_p
:param public_key: the public key to use for this block
:param link: optionally create the block as a linked block to this block
:param link_pk: the public key of the counterparty in this transaction
:param additional_info: additional information, which has a higher priority than the
transaction when link exists
:return: A newly created block
"""
latest_bw_block = database.get_latest(public_key, block_type='tribler_bandwidth')
Expand Down
16 changes: 8 additions & 8 deletions Tribler/Test/Community/popularity/test_payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def test_content_subscription(self):
subscribe = True
identifier = 123123
subscription = ContentSubscription(identifier, subscribe)
serialized = self.serializer.pack_multiple(subscription.to_pack_list())
serialized = self.serializer.pack_multiple(subscription.to_pack_list())[0]

# Deserialize and test it
(deserialized, _) = self.serializer.unpack_multiple(ContentSubscription.format_list, serialized)
Expand All @@ -41,7 +41,7 @@ def test_torrent_health_payload(self):
timestamp = 123123123

health_payload = TorrentHealthPayload(infohash, num_seeders, num_leechers, timestamp)
serialized = self.serializer.pack_multiple(health_payload.to_pack_list())
serialized = self.serializer.pack_multiple(health_payload.to_pack_list())[0]

# Deserialize and test it
(deserialized, _) = self.serializer.unpack_multiple(TorrentHealthPayload.format_list, serialized)
Expand All @@ -61,7 +61,7 @@ def test_channel_health_payload(self):
timestamp = 123123123

health_payload = ChannelHealthPayload(channel_id, num_votes, num_torrents, swarm_size_sum, timestamp)
serialized = self.serializer.pack_multiple(health_payload.to_pack_list())
serialized = self.serializer.pack_multiple(health_payload.to_pack_list())[0]

# Deserialize and test it
(deserialized, _) = self.serializer.unpack_multiple(ChannelHealthPayload.format_list, serialized)
Expand All @@ -83,7 +83,7 @@ def test_torrent_info_response_payload_for_default_values(self):
comment = None

health_payload = TorrentInfoResponsePayload(infohash, name, length, creation_date, num_files, comment)
serialized = self.serializer.pack_multiple(health_payload.to_pack_list())
serialized = self.serializer.pack_multiple(health_payload.to_pack_list())[0]

# Deserialize and test it
(deserialized, _) = self.serializer.unpack_multiple(TorrentInfoResponsePayload.format_list, serialized)
Expand Down Expand Up @@ -121,9 +121,9 @@ def test_search_result_payload_serialization(self):
# Serialize the results
results = ''
for item in sample_items:
results += self.serializer.pack_multiple(item.to_pack_list())
results += self.serializer.pack_multiple(item.to_pack_list())[0]
serialized_results = self.serializer.pack_multiple(
SearchResponsePayload(identifier, response_type, results).to_pack_list())
SearchResponsePayload(identifier, response_type, results).to_pack_list())[0]

# De-serialize the response payload and check the identifier and get the results
response_format = SearchResponsePayload.format_list
Expand Down Expand Up @@ -172,7 +172,7 @@ def test_content_info_request(self):

# Serialize request
in_request = ContentInfoRequest(identifier, content_type, query_list, limit)
serialized_request = self.serializer.pack_multiple(in_request.to_pack_list())
serialized_request = self.serializer.pack_multiple(in_request.to_pack_list())[0]

# Deserialize request and test it
(deserialized_request, _) = self.serializer.unpack_multiple(ContentInfoRequest.format_list, serialized_request)
Expand All @@ -192,7 +192,7 @@ def test_content_info_response(self):

# Serialize request
in_response = ContentInfoResponse(identifier, content_type, response, pagination)
serialized_response = self.serializer.pack_multiple(in_response.to_pack_list())
serialized_response = self.serializer.pack_multiple(in_response.to_pack_list())[0]

# Deserialize request and test it
(deserialized_response, _) = self.serializer.unpack_multiple(ContentInfoResponse.format_list,
Expand Down
37 changes: 34 additions & 3 deletions Tribler/Test/Core/Libtorrent/test_libtorrent_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@ def setUp(self, annotate=True):
yield super(TestLibtorrentMgr, self).setUp(annotate)

self.tribler_session = MockObject()
self.tribler_session.lm = MockObject()
self.tribler_session.notifier = Notifier()
self.tribler_session.state_dir = self.session_base_dir
self.tribler_session.trustchain_keypair = MockObject()
self.tribler_session.trustchain_keypair.key_to_hash = lambda: 'a' * 20

self.tribler_session.config = MockObject()
self.tribler_session.config.get_libtorrent_utp = lambda: True
Expand Down Expand Up @@ -418,15 +421,43 @@ def dl_from_tdef(tdef, _):

self.assertTrue(os.path.isfile(filename))

@deferred(timeout=5)
def test_callback_on_alert(self):
"""
Test whether the alert callback is called when a libtorrent alert is posted
"""
self.ltmgr.default_alert_mask = 0xffffffff
mutable_container = [False]
test_deferred = Deferred()

def callback(*args):
mutable_container[0] = True
self.ltmgr.alert_callback = None
test_deferred.callback(None)

callback.called = False
self.ltmgr.alert_callback = callback
self.ltmgr.initialize()
self.ltmgr._task_process_alerts()
self.assertTrue(mutable_container[0])
return test_deferred

def test_payout_on_disconnect(self):
"""
Test whether a payout is initialized when a peer disconnects
"""
class peer_disconnected_alert(object):
def __init__(self):
self.pid = MockObject()
self.pid.to_string = lambda: 'a' * 20

def mocked_do_payout(mid):
self.assertEqual(mid, 'a' * 20)
mocked_do_payout.called = True
mocked_do_payout.called = False

disconnect_alert = peer_disconnected_alert()
self.ltmgr.tribler_session.lm.payout_manager = MockObject()
self.ltmgr.tribler_session.lm.payout_manager.do_payout = mocked_do_payout
self.ltmgr.initialize()
self.ltmgr.get_session(0).pop_alerts = lambda: [disconnect_alert]
self.ltmgr._task_process_alerts()

self.assertTrue(mocked_do_payout.called)
54 changes: 54 additions & 0 deletions Tribler/Test/Core/Modules/test_payout_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from twisted.internet.defer import inlineCallbacks, succeed

from Tribler.Core.Modules.payout_manager import PayoutManager
from Tribler.Test.Core.base_test import TriblerCoreTest, MockObject
from Tribler.pyipv8.ipv8.util import blocking_call_on_reactor_thread


class TestPayoutManager(TriblerCoreTest):
"""
This class contains various tests for the payout manager.
"""

@blocking_call_on_reactor_thread
@inlineCallbacks
def setUp(self, annotate=True):
yield super(TestPayoutManager, self).setUp(annotate=annotate)

fake_tc = MockObject()
fake_tc.add_listener = lambda *_: None

fake_response_peer = MockObject()
fake_response_peer.public_key = MockObject()
fake_response_peer.public_key.key_to_bin = lambda: 'a' * 64
fake_dht = MockObject()
fake_dht.connect_peer = lambda *_: succeed([fake_response_peer])

self.payout_manager = PayoutManager(fake_tc, fake_dht)

def test_do_payout(self):
"""
Test doing a payout
"""
self.payout_manager.do_payout('a') # Does not exist
self.payout_manager.update_peer('b', 'c', 10 * 1024 * 1024)
self.payout_manager.update_peer('b', 'd', 1337)

def mocked_sign_block(*_, **kwargs):
tx = kwargs.pop('transaction')
self.assertEqual(tx['down'], 10 * 1024 * 1024 + 1337)

self.payout_manager.bandwidth_wallet.trustchain.sign_block = mocked_sign_block
self.payout_manager.do_payout('b')

def test_update_peer(self):
"""
Test the updating of a specific peer
"""
self.payout_manager.update_peer('a', 'b', 1337)
self.assertIn('a', self.payout_manager.tribler_peers)
self.assertIn('b', self.payout_manager.tribler_peers['a'])
self.assertEqual(self.payout_manager.tribler_peers['a']['b'], 1337)

self.payout_manager.update_peer('a', 'b', 1338)
self.assertEqual(self.payout_manager.tribler_peers['a']['b'], 1338)
Loading