Skip to content

Commit

Permalink
Implemented payouts
Browse files Browse the repository at this point in the history
  • Loading branch information
devos50 committed Mar 4, 2018
1 parent 77fb0d1 commit bfa1394
Show file tree
Hide file tree
Showing 8 changed files with 314 additions and 95 deletions.
3 changes: 2 additions & 1 deletion Tribler/Core/APIImplementation/LaunchManyCore.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,8 @@ def load_ipv8_overlays(self):
tribler_session=self.session,
dht_provider=MainlineDHTProvider(
self.mainline_dht,
self.session.config.get_dispersy_port()))
self.session.config.get_dispersy_port()),
triblerchain_community=self.triblerchain_community)
self.ipv8.overlays.append(self.tunnel_community)
self.ipv8.strategies.append((RandomWalk(self.tunnel_community), 20))

Expand Down
39 changes: 1 addition & 38 deletions Tribler/Test/Community/Triblerchain/test_community.py
Original file line number Diff line number Diff line change
@@ -1,52 +1,15 @@
from Tribler.Test.Core.base_test import MockObject
from Tribler.Test.ipv8_base import TestBase
from Tribler.Test.mocking.ipv8 import MockIPv8
from Tribler.Test.util.ipv8_util import twisted_wrapper
from Tribler.community.triblerchain.block import TriblerChainBlock
from Tribler.community.triblerchain.community import TriblerChainCommunity, TriblerChainCrawlerCommunity
from Tribler.pyipv8.ipv8.messaging.anonymization.tunnel import Circuit


class TestTriblerChainCommunity(TestBase):

def setUp(self):
super(TestTriblerChainCommunity, self).setUp()
self.initialize(TriblerChainCommunity, 2)
self.nodes[0].overlay.SIGN_DELAY = 0
self.nodes[1].overlay.SIGN_DELAY = 0

def create_node(self):
return MockIPv8(u"curve25519", TriblerChainCommunity, working_directory=u":memory:")

@twisted_wrapper
def test_on_tunnel_remove(self):
"""
Test whether a message is created when a tunnel has been removed
"""
tribler_session = MockObject()
tribler_session.lm = MockObject()
tribler_session.lm.tunnel_community = MockObject()
tribler_session.lm.tunnel_community.network = self.nodes[0].overlay.network
self.nodes[0].overlay.tribler_session = tribler_session

circuit = Circuit(3)
circuit.bytes_up = 50 * 1024 * 1024
self.nodes[0].overlay.on_tunnel_remove(None, None, circuit,
self.nodes[0].overlay.network.verified_peers[0].address)

yield self.sleep(time=0.1)

my_pk = self.nodes[0].overlay.my_peer.public_key.key_to_bin()
self.assertTrue(self.nodes[0].overlay.persistence.get(my_pk, 1))
from Tribler.community.triblerchain.community import TriblerChainCrawlerCommunity


class TestTriblerChainCrawlerCommunity(TestBase):

def setUp(self):
super(TestTriblerChainCrawlerCommunity, self).setUp()
self.initialize(TriblerChainCrawlerCommunity, 2)
self.nodes[0].overlay.SIGN_DELAY = 0
self.nodes[1].overlay.SIGN_DELAY = 0

def create_node(self):
return MockIPv8(u"curve25519", TriblerChainCrawlerCommunity, working_directory=u":memory:")
Expand Down
139 changes: 138 additions & 1 deletion Tribler/Test/Community/Tunnel/test_community.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,94 @@
from Tribler.Test.Core.base_test import MockObject
from Tribler.Test.ipv8_base import TestBase
from Tribler.Test.mocking.exit_socket import MockTunnelExitSocket
from Tribler.Test.mocking.ipv8 import MockIPv8
from Tribler.Test.util.ipv8_util import twisted_wrapper
from Tribler.community.triblerchain.community import TriblerChainCommunity
from Tribler.community.triblertunnel.community import TriblerTunnelCommunity
from Tribler.pyipv8.ipv8.messaging.anonymization.tunnel import CIRCUIT_TYPE_RENDEZVOUS
from Tribler.pyipv8.ipv8.peer import Peer
from Tribler.pyipv8.ipv8.util import blocking_call_on_reactor_thread
from twisted.internet.defer import inlineCallbacks


# Map of info_hash -> peer list
global_dht_services = {}


class MockDHTProvider(object):

def __init__(self, address):
self.address = address

def lookup(self, info_hash, cb):
if info_hash in global_dht_services:
cb(info_hash, global_dht_services[info_hash], None)

def announce(self, info_hash):
if info_hash in global_dht_services:
global_dht_services[info_hash].append(self.address)
else:
global_dht_services[info_hash] = [self.address]


class TestTriblerTunnelCommunity(TestBase):

def setUp(self):
super(TestTriblerTunnelCommunity, self).setUp()
self.private_nodes = []
self.initialize(TriblerTunnelCommunity, 1)

def tearDown(self):
super(TestTriblerTunnelCommunity, self).tearDown()

for node in self.private_nodes:
node.unload()

def create_node(self):
return MockIPv8(u"curve25519", TriblerTunnelCommunity)
mock_ipv8 = MockIPv8(u"curve25519", TriblerTunnelCommunity, socks_listen_ports=[])
mock_ipv8.overlay._use_main_thread = False

# Load the TriblerChain community
overlay = TriblerChainCommunity(mock_ipv8.my_peer, mock_ipv8.endpoint, mock_ipv8.network,
working_directory=u":memory:")
mock_ipv8.overlay.triblerchain_community = overlay
mock_ipv8.overlay.dht_provider = MockDHTProvider(mock_ipv8.endpoint.wan_address)

return mock_ipv8

@inlineCallbacks
def create_intro(self, node_nr, service):
"""
Create an 1 hop introduction point for some node for some service.
"""
lookup_service = self.nodes[node_nr].overlay.get_lookup_info_hash(service)
self.nodes[node_nr].overlay.hops[lookup_service] = 1
self.nodes[node_nr].overlay.create_introduction_point(lookup_service)

yield self.deliver_messages()

for node in self.nodes:
exit_sockets = node.overlay.exit_sockets
for exit_socket in exit_sockets:
exit_sockets[exit_socket] = MockTunnelExitSocket(exit_sockets[exit_socket])

@inlineCallbacks
def assign_exit_node(self, node_nr):
"""
Give a node a dedicated exit node to play with.
"""
exit_node = self.create_node()
self.private_nodes.append(exit_node)
exit_node.overlay.settings.become_exitnode = True
public_peer = Peer(exit_node.my_peer.public_key, exit_node.my_peer.address)
self.nodes[node_nr].network.add_verified_peer(public_peer)
self.nodes[node_nr].network.discover_services(public_peer, exit_node.overlay.master_peer.mid)
self.nodes[node_nr].overlay.update_exit_candidates(public_peer, True)
self.nodes[node_nr].overlay.build_tunnels(1)
yield self.deliver_messages()
exit_sockets = exit_node.overlay.exit_sockets
for exit_socket in exit_sockets:
exit_sockets[exit_socket] = MockTunnelExitSocket(exit_sockets[exit_socket])

@blocking_call_on_reactor_thread
def test_download_remove(self):
Expand Down Expand Up @@ -143,3 +219,64 @@ def test_update_torrent(self):
# Test adding peers
self.nodes[0].overlay.bittorrent_peers['a'] = {4}
self.nodes[0].overlay.update_torrent(peers, mock_handle, 'a')

@twisted_wrapper
def test_payouts(self):
"""
Test whether nodes are correctly paid after transferring data
"""
self.add_node_to_experiment(self.create_node())
self.add_node_to_experiment(self.create_node())

# Build a tunnel
self.nodes[2].overlay.settings.become_exitnode = True
yield self.introduce_nodes()
self.nodes[0].overlay.build_tunnels(2)
yield self.deliver_messages()

self.assertEqual(self.nodes[0].overlay.tunnels_ready(2), 1.0)

# Destroy the circuit
for circuit_id, circuit in self.nodes[0].overlay.circuits.items():
circuit.bytes_down = 250 * 1024 * 1024
self.nodes[0].overlay.remove_circuit(circuit_id, destroy=True)

yield self.deliver_messages()

# Verify whether the downloader (node 0) correctly paid the relay and exit nodes.
self.assertTrue(self.nodes[0].overlay.triblerchain_community.get_bandwidth_tokens() < 0)
self.assertTrue(self.nodes[1].overlay.triblerchain_community.get_bandwidth_tokens() > 0)
self.assertTrue(self.nodes[2].overlay.triblerchain_community.get_bandwidth_tokens() > 0)

@twisted_wrapper
def test_payouts_e2e(self):
"""
Check if payouts work for an e2e-linked circuit
"""
self.add_node_to_experiment(self.create_node())
self.add_node_to_experiment(self.create_node())

service = '0' * 20

self.nodes[0].overlay.register_service(service, 1, None, 0)

yield self.introduce_nodes()
yield self.create_intro(2, service)
yield self.assign_exit_node(0)

self.nodes[0].overlay.do_dht_lookup(service)

yield self.deliver_messages(timeout=.2)

# Destroy the e2e-circuit
for circuit_id, circuit in self.nodes[0].overlay.circuits.items():
if circuit.ctype == CIRCUIT_TYPE_RENDEZVOUS:
circuit.bytes_down = 250 * 1024 * 1024
self.nodes[0].overlay.remove_circuit(circuit_id, destroy=True)

yield self.deliver_messages()

# Verify whether the downloader (node 0) correctly paid the subsequent nodes.
self.assertTrue(self.nodes[0].overlay.triblerchain_community.get_bandwidth_tokens() < 0)
self.assertTrue(self.nodes[1].overlay.triblerchain_community.get_bandwidth_tokens() > 0)
self.assertTrue(self.nodes[2].overlay.triblerchain_community.get_bandwidth_tokens() > 0)
2 changes: 2 additions & 0 deletions Tribler/Test/Core/Modules/RestApi/test_debug_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from Tribler.Test.Core.Modules.RestApi.base_api_test import AbstractApiTest
from Tribler.Test.Core.base_test import MockObject
from Tribler.Test.twisted_thread import deferred
from Tribler.pyipv8.ipv8.messaging.anonymization.tunnel import CIRCUIT_TYPE_DATA


class TestCircuitDebugEndpoint(AbstractApiTest):
Expand Down Expand Up @@ -40,6 +41,7 @@ def test_get_circuits(self):
mock_circuit.hops = [mock_hop]
mock_circuit.sock_addr = ("1.1.1.1", 1234)
mock_circuit.circuit_id = 1234
mock_circuit.ctype = CIRCUIT_TYPE_DATA
mock_circuit.destroy = lambda: None

self.session.lm.tunnel_community.circuits = {1234: mock_circuit}
Expand Down
40 changes: 40 additions & 0 deletions Tribler/Test/mocking/exit_socket.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from __future__ import absolute_import

from Tribler.Test.mocking.endpoint import AutoMockEndpoint
from Tribler.pyipv8.ipv8.messaging.anonymization.tunnel import TunnelExitSocket, DataChecker
from Tribler.pyipv8.ipv8.messaging.interfaces.endpoint import EndpointListener
from twisted.internet.defer import succeed



class MockTunnelExitSocket(TunnelExitSocket, EndpointListener):

def __init__(self, parent):
self.endpoint = AutoMockEndpoint()
self.endpoint.open()

TunnelExitSocket.__init__(self, parent.circuit_id, parent.overlay, parent.sock_addr, parent.mid)
parent.close()
EndpointListener.__init__(self, self.endpoint)

self.endpoint.add_listener(self)

def enable(self):
pass

@property
def enabled(self):
return True

def sendto(self, data, destination):
if DataChecker.is_allowed(data):
self.endpoint.send(destination, data)
else:
raise AssertionError("Attempted to exit data which is not allowed" % repr(data))

def on_packet(self, packet):
source_address, data = packet
self.datagramReceived(data, source_address)

def close(self):
return succeed(True)
47 changes: 0 additions & 47 deletions Tribler/community/triblerchain/community.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ class TriblerChainCommunity(TrustChainCommunity):
"""
BLOCK_CLASS = TriblerChainBlock
DB_CLASS = TriblerChainDB
SIGN_DELAY = 5
master_peer = Peer("3081a7301006072a8648ce3d020106052b81040027038192000405c66d3deddb1721787a247b2285118c06ce9fb"
"20ebd3546969fa2f4811fa92426637423d3bac1510f92b33e2ff5a785bf54eb3b28d29a77d557011d7d5241243c"
"9c89c987cd049404c4024999e1505fa96e1d6668234bde28a666d458d67251d17ff45185515a28967ddcf50503c"
Expand All @@ -45,11 +44,6 @@ class TriblerChainCommunity(TrustChainCommunity):
def __init__(self, *args, **kwargs):
self.tribler_session = kwargs.pop('tribler_session', None)
super(TriblerChainCommunity, self).__init__(*args, **kwargs)
self.notifier = None

if self.tribler_session:
self.notifier = self.tribler_session.notifier
self.notifier.add_observer(self.on_tunnel_remove, NTFY_TUNNEL, [NTFY_REMOVE])

# We store the bytes send and received in the tunnel community in a dictionary.
# The key is the public key of the peer being interacted with, the value a tuple of the up and down bytes
Expand Down Expand Up @@ -85,51 +79,10 @@ def get_statistics(self, public_key=None):
statistics["total_down"] = 0
return statistics

def on_tunnel_remove(self, subject, change_type, tunnel, sock_addr):
"""
Handler for the remove event of a tunnel. This function will attempt to create a block for the amounts that
were transferred using the tunnel.
:param subject: Category of the notifier event
:param change_type: Type of the notifier event
:param tunnel: The tunnel that was removed (closed)
:param sock_addr: The address of the peer with whom this node has interacted in the tunnel
"""
tunnel_peer = None
for verified_peer in self.tribler_session.lm.tunnel_community.network.verified_peers:
if verified_peer.address == sock_addr:
tunnel_peer = verified_peer
break

if not tunnel_peer:
self.logger.warning("Could not find interacting peer for signing a TriblerChain block!")
return

up = tunnel.bytes_up
down = tunnel.bytes_down
pk = tunnel_peer.public_key.key_to_bin()

# If the transaction is not big enough we discard the bytes up and down.
if up + down >= MIN_TRANSACTION_SIZE:
# Tie breaker to prevent both parties from requesting
if up > down or (up == down and self.my_peer.public_key.key_to_bin() > pk):
self.register_task("sign_%s" % tunnel.circuit_id,
reactor.callLater(self.SIGN_DELAY, self.sign_block, tunnel_peer, pk,
{'up': tunnel.bytes_up, 'down': tunnel.bytes_down}))
else:
pend = self.pending_bytes.get(pk)
if not pend:
task = self.register_task("cleanup_pending_%s" % tunnel.circuit_id,
reactor.callLater(2 * 60, self.cleanup_pending, pk))
self.pending_bytes[pk] = PendingBytes(up, down, task)
else:
pend.add(up, down)

def cleanup_pending(self, public_key):
self.pending_bytes.pop(public_key, None)

def unload(self):
if self.notifier:
self.notifier.remove_observer(self.on_tunnel_remove)
for pk in self.pending_bytes:
if self.pending_bytes[pk].clean is not None:
self.pending_bytes[pk].clean.reset(0)
Expand Down
Loading

0 comments on commit bfa1394

Please sign in to comment.