Skip to content

Commit

Permalink
Merge pull request #67 from devos50/remove_hex_encode
Browse files Browse the repository at this point in the history
Added delay when removing tunnel info, fixed mid encoding
  • Loading branch information
qstokkink authored Mar 4, 2018
2 parents 577a880 + 9c71283 commit 9e417ab
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 50 deletions.
114 changes: 69 additions & 45 deletions ipv8/messaging/anonymization/community.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ def __init__(self):
self.max_packets_without_reply = 50
self.dht_lookup_interval = 30

# We have a small delay when removing circuits/relays/exit nodes. This is to allow some post-mortem data
# to flow over the circuit (i.e. bandwidth payouts to intermediate nodes in a circuit).
self.remove_tunnel_delay = 5

self.become_exitnode = False


Expand Down Expand Up @@ -180,11 +184,11 @@ def become_exitnode(self):
def unload(self):
# Remove all circuits/relays/exitsockets
for circuit_id in self.circuits.keys():
self.remove_circuit(circuit_id, 'unload', destroy=True)
self.remove_circuit(circuit_id, 'unload', remove_now=True, destroy=True)
for circuit_id in self.relay_from_to.keys():
self.remove_relay(circuit_id, 'unload', destroy=True, both_sides=False)
self.remove_relay(circuit_id, 'unload', remove_now=True, destroy=True, both_sides=False)
for circuit_id in self.exit_sockets.keys():
self.remove_exit_socket(circuit_id, 'unload', destroy=True)
self.remove_exit_socket(circuit_id, 'unload', remove_now=True, destroy=True)

self.request_cache.clear()

Expand Down Expand Up @@ -309,7 +313,7 @@ def create_circuit(self, goal_hops, ctype=CIRCUIT_TYPE_DATA, callback=None, requ
# Finally, construct the Circuit object and send the CREATE message
circuit_id = self._generate_circuit_id(first_hop.address)
circuit = Circuit(circuit_id, goal_hops, first_hop.address, self, ctype, callback,
required_exit, first_hop.mid.encode('hex'), info_hash)
required_exit, first_hop.mid, info_hash)

self.request_cache.add(CircuitRequestCache(self, circuit))

Expand All @@ -331,23 +335,31 @@ def create_circuit(self, goal_hops, ctype=CIRCUIT_TYPE_DATA, callback=None, requ

return circuit_id

def remove_circuit(self, circuit_id, additional_info='', destroy=False):
def remove_circuit(self, circuit_id, additional_info='', remove_now=False, destroy=False):
assert isinstance(circuit_id, (long, int)), type(circuit_id)

circuit = self.circuits.pop(circuit_id, None)
if circuit:
if destroy:
self.destroy_circuit(circuit)
def remove_circuit_info():
circuit = self.circuits.pop(circuit_id, None)
if circuit:
if destroy:
self.destroy_circuit(circuit)

self.logger.info("removing circuit %d " + additional_info, circuit_id)

self.logger.info("removing circuit %d " + additional_info, circuit_id)
circuit.destroy()

circuit.destroy()
# Clean up the directions dictionary
if circuit_id in self.directions:
del self.directions[circuit_id]

# Clean up the directions dictionary
if circuit_id in self.directions:
del self.directions[circuit_id]
if self.settings.remove_tunnel_delay == 0 or remove_now:
remove_circuit_info()
elif not self.is_pending_task_active("remove_circuit_%s" % circuit_id):
self.register_task("remove_circuit_%s" % circuit_id,
reactor.callLater(self.settings.remove_tunnel_delay, remove_circuit_info))

def remove_relay(self, circuit_id, additional_info='', destroy=False, got_destroy_from=None, both_sides=True):
def remove_relay(self, circuit_id, additional_info='', remove_now=False, destroy=False,
got_destroy_from=None, both_sides=True):
"""
Remove a relay and all information associated with the relay. Return the relays that have been removed.
"""
Expand All @@ -364,41 +376,53 @@ def remove_relay(self, circuit_id, additional_info='', destroy=False, got_destro

removed_relays = []
for cid in to_remove:
# Remove the relay
self.logger.info("Removing relay %d %s", cid, additional_info)
relay = self.relay_from_to.pop(cid, None)
if relay:
removed_relays.append(relay)
def remove_relay_info(cid_to_remove):
# Remove the relay
self.logger.info("Removing relay %d %s", cid_to_remove, additional_info)

# Remove old session key
if cid in self.relay_session_keys:
del self.relay_session_keys[cid]
relay = self.relay_from_to.pop(cid_to_remove, None)
if relay:
removed_relays.append(relay)

# Clean directions dictionary
self.directions.pop(cid, None)
# Remove old session key
if cid_to_remove in self.relay_session_keys:
del self.relay_session_keys[cid_to_remove]

return removed_relays
# Clean directions dictionary
self.directions.pop(cid_to_remove, None)

if self.settings.remove_tunnel_delay == 0 or remove_now:
remove_relay_info(cid)
elif not self.is_pending_task_active("remove_relay_%s" % cid):
self.register_task("remove_relay_%s" % cid,
reactor.callLater(self.settings.remove_tunnel_delay,
lambda cid_copy=cid: remove_relay_info(cid_copy)))

def remove_exit_socket(self, circuit_id, additional_info='', destroy=False):
exit_socket = self.exit_sockets.pop(circuit_id, None)
return removed_relays

if exit_socket:
if destroy:
self.destroy_exit_socket(circuit_id)
def remove_exit_socket(self, circuit_id, additional_info='', remove_now=False, destroy=False):
def remove_exit_socket_info():
exit_socket = self.exit_sockets.pop(circuit_id, None)
if exit_socket:
if destroy:
self.destroy_exit_socket(circuit_id)

# Close socket
if exit_socket.enabled:
self.logger.info("Removing exit socket %d %s", circuit_id, additional_info)
# Close socket
if exit_socket.enabled:
self.logger.info("Removing exit socket %d %s", circuit_id, additional_info)

def on_exit_socket_closed(_):
# Remove old session key
if circuit_id in self.relay_session_keys:
del self.relay_session_keys[circuit_id]
def on_exit_socket_closed(_):
# Remove old session key
if circuit_id in self.relay_session_keys:
del self.relay_session_keys[circuit_id]

exit_socket.close().addCallback(on_exit_socket_closed)
exit_socket.close().addCallback(on_exit_socket_closed)

else:
self.logger.error("could not remove exit socket %d %s", circuit_id, additional_info)
if self.settings.remove_tunnel_delay == 0 or remove_now:
remove_exit_socket_info()
elif not self.is_pending_task_active("remove_exit_socket_%s" % circuit_id):
self.register_task("remove_exit_socket_%s" % circuit_id,
reactor.callLater(self.settings.remove_tunnel_delay, remove_exit_socket_info))

def destroy_circuit(self, circuit, reason=0):
sock_addr = circuit.sock_addr
Expand Down Expand Up @@ -863,7 +887,7 @@ def on_extend(self, source_address, data, _):
if not extend_candidate:
extend_candidate = Peer(payload.node_public_key, payload.node_addr)
self.network.add_verified_peer(extend_candidate)
extend_candidate_mid = extend_candidate.mid.encode('hex')
extend_candidate_mid = extend_candidate.mid

self.logger.info("on_extend send CREATE for circuit (%s, %d) to %s:%d", source_address,
circuit_id,
Expand Down Expand Up @@ -943,8 +967,8 @@ def on_data(self, sock_addr, packet):
circuit.beat_heart()
self.increase_bytes_received(circuit, len(packet))

if DataChecker.could_be_dispersy(data):
self.logger.debug("Giving incoming data packet to dispersy")
if DataChecker.could_be_ipv8(data):
self.logger.debug("Giving incoming data packet to IPv8")
self.logger.debug("CIRCUIT ID = %d", circuit_id)
self.on_packet((origin, data[4:]), circuit_id=u"circuit_%d" % circuit_id)
else:
Expand Down Expand Up @@ -1005,7 +1029,7 @@ def on_destroy(self, source_address, data):
self.remove_circuit(circuit_id, "Got destroy")

def exit_data(self, circuit_id, sock_addr, destination, data):
if not self.become_exitnode() and not DataChecker.could_be_dispersy(data):
if not self.become_exitnode() and not DataChecker.could_be_ipv8(data):
self.logger.error("Dropping data packets, refusing to be an exit node for data")

elif circuit_id in self.exit_sockets:
Expand Down
4 changes: 2 additions & 2 deletions ipv8/messaging/anonymization/hidden_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ def tunnel_data(self, circuit, destination, message_type, payload):
pre = destination
self.send_data([circuit.sock_addr], circuit.circuit_id, pre, post, TUNNEL_PREFIX + packet)

def remove_circuit(self, circuit_id, additional_info='', destroy=False):
super(HiddenTunnelCommunity, self).remove_circuit(circuit_id, additional_info, destroy)
def remove_circuit(self, circuit_id, additional_info='', remove_now=False, destroy=False):
super(HiddenTunnelCommunity, self).remove_circuit(circuit_id, additional_info, remove_now, destroy)

circuit = self.my_intro_points.pop(circuit_id, None)
if circuit:
Expand Down
6 changes: 3 additions & 3 deletions ipv8/messaging/anonymization/tunnel.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
CIRCUIT_TYPE_DATA = 'DATA'

# The other circuits are supposed to end in a connectable node, not allowed to exit
# anything else than dispersy messages, used for setting up end-to-end circuits
# anything else than IPv8 messages, used for setting up end-to-end circuits
CIRCUIT_TYPE_IP = 'IP'
CIRCUIT_TYPE_RP = 'RP'
CIRCUIT_TYPE_RENDEZVOUS = 'RENDEZVOUS'
Expand Down Expand Up @@ -72,15 +72,15 @@ def could_be_dht(data):
return False

@staticmethod
def could_be_dispersy(data):
def could_be_ipv8(data):
return data[:4] == "ffffffff".decode("HEX") and len(data) >= (23 + 4)

@staticmethod
def is_allowed(data):
return (DataChecker.could_be_utp(data) or
DataChecker.could_be_udp_tracker(data) or
DataChecker.could_be_dht(data) or
DataChecker.could_be_dispersy(data))
DataChecker.could_be_ipv8(data))


class Tunnel(object):
Expand Down
1 change: 1 addition & 0 deletions test/messaging/anonymization/test_community.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def create_node(self):
settings.become_exitnode = False
settings.min_circuits = 0
settings.max_circuits = 0
settings.remove_tunnel_delay = 0
ipv8 = MockIPv8(u"curve25519", TunnelCommunity, settings=settings)
# Then kill all automated circuit creation
ipv8.overlay.cancel_all_pending_tasks()
Expand Down

0 comments on commit 9e417ab

Please sign in to comment.