Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added delay when removing tunnel info, fixed mid encoding #67

Merged
merged 3 commits into from
Mar 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 69 additions & 45 deletions ipv8/messaging/anonymization/community.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ def __init__(self):
self.max_packets_without_reply = 50
self.dht_lookup_interval = 30

# We have a small delay when removing circuits/relays/exit nodes. This is to allow some post-mortem data
# to flow over the circuit (i.e. bandwidth payouts to intermediate nodes in a circuit).
self.remove_tunnel_delay = 5

self.become_exitnode = False


Expand Down Expand Up @@ -180,11 +184,11 @@ def become_exitnode(self):
def unload(self):
# Remove all circuits/relays/exitsockets
for circuit_id in self.circuits.keys():
self.remove_circuit(circuit_id, 'unload', destroy=True)
self.remove_circuit(circuit_id, 'unload', remove_now=True, destroy=True)
for circuit_id in self.relay_from_to.keys():
self.remove_relay(circuit_id, 'unload', destroy=True, both_sides=False)
self.remove_relay(circuit_id, 'unload', remove_now=True, destroy=True, both_sides=False)
for circuit_id in self.exit_sockets.keys():
self.remove_exit_socket(circuit_id, 'unload', destroy=True)
self.remove_exit_socket(circuit_id, 'unload', remove_now=True, destroy=True)

self.request_cache.clear()

Expand Down Expand Up @@ -309,7 +313,7 @@ def create_circuit(self, goal_hops, ctype=CIRCUIT_TYPE_DATA, callback=None, requ
# Finally, construct the Circuit object and send the CREATE message
circuit_id = self._generate_circuit_id(first_hop.address)
circuit = Circuit(circuit_id, goal_hops, first_hop.address, self, ctype, callback,
required_exit, first_hop.mid.encode('hex'), info_hash)
required_exit, first_hop.mid, info_hash)

self.request_cache.add(CircuitRequestCache(self, circuit))

Expand All @@ -331,23 +335,31 @@ def create_circuit(self, goal_hops, ctype=CIRCUIT_TYPE_DATA, callback=None, requ

return circuit_id

def remove_circuit(self, circuit_id, additional_info='', destroy=False):
def remove_circuit(self, circuit_id, additional_info='', remove_now=False, destroy=False):
assert isinstance(circuit_id, (long, int)), type(circuit_id)

circuit = self.circuits.pop(circuit_id, None)
if circuit:
if destroy:
self.destroy_circuit(circuit)
def remove_circuit_info():
circuit = self.circuits.pop(circuit_id, None)
if circuit:
if destroy:
self.destroy_circuit(circuit)

self.logger.info("removing circuit %d " + additional_info, circuit_id)

self.logger.info("removing circuit %d " + additional_info, circuit_id)
circuit.destroy()

circuit.destroy()
# Clean up the directions dictionary
if circuit_id in self.directions:
del self.directions[circuit_id]

# Clean up the directions dictionary
if circuit_id in self.directions:
del self.directions[circuit_id]
if self.settings.remove_tunnel_delay == 0 or remove_now:
remove_circuit_info()
elif not self.is_pending_task_active("remove_circuit_%s" % circuit_id):
self.register_task("remove_circuit_%s" % circuit_id,
reactor.callLater(self.settings.remove_tunnel_delay, remove_circuit_info))

def remove_relay(self, circuit_id, additional_info='', destroy=False, got_destroy_from=None, both_sides=True):
def remove_relay(self, circuit_id, additional_info='', remove_now=False, destroy=False,
got_destroy_from=None, both_sides=True):
"""
Remove a relay and all information associated with the relay. Return the relays that have been removed.
"""
Expand All @@ -364,41 +376,53 @@ def remove_relay(self, circuit_id, additional_info='', destroy=False, got_destro

removed_relays = []
for cid in to_remove:
# Remove the relay
self.logger.info("Removing relay %d %s", cid, additional_info)
relay = self.relay_from_to.pop(cid, None)
if relay:
removed_relays.append(relay)
def remove_relay_info(cid_to_remove):
# Remove the relay
self.logger.info("Removing relay %d %s", cid_to_remove, additional_info)

# Remove old session key
if cid in self.relay_session_keys:
del self.relay_session_keys[cid]
relay = self.relay_from_to.pop(cid_to_remove, None)
if relay:
removed_relays.append(relay)

# Clean directions dictionary
self.directions.pop(cid, None)
# Remove old session key
if cid_to_remove in self.relay_session_keys:
del self.relay_session_keys[cid_to_remove]

return removed_relays
# Clean directions dictionary
self.directions.pop(cid_to_remove, None)

if self.settings.remove_tunnel_delay == 0 or remove_now:
remove_relay_info(cid)
elif not self.is_pending_task_active("remove_relay_%s" % cid):
self.register_task("remove_relay_%s" % cid,
reactor.callLater(self.settings.remove_tunnel_delay,
lambda cid_copy=cid: remove_relay_info(cid_copy)))

def remove_exit_socket(self, circuit_id, additional_info='', destroy=False):
exit_socket = self.exit_sockets.pop(circuit_id, None)
return removed_relays

if exit_socket:
if destroy:
self.destroy_exit_socket(circuit_id)
def remove_exit_socket(self, circuit_id, additional_info='', remove_now=False, destroy=False):
def remove_exit_socket_info():
exit_socket = self.exit_sockets.pop(circuit_id, None)
if exit_socket:
if destroy:
self.destroy_exit_socket(circuit_id)

# Close socket
if exit_socket.enabled:
self.logger.info("Removing exit socket %d %s", circuit_id, additional_info)
# Close socket
if exit_socket.enabled:
self.logger.info("Removing exit socket %d %s", circuit_id, additional_info)

def on_exit_socket_closed(_):
# Remove old session key
if circuit_id in self.relay_session_keys:
del self.relay_session_keys[circuit_id]
def on_exit_socket_closed(_):
# Remove old session key
if circuit_id in self.relay_session_keys:
del self.relay_session_keys[circuit_id]

exit_socket.close().addCallback(on_exit_socket_closed)
exit_socket.close().addCallback(on_exit_socket_closed)

else:
self.logger.error("could not remove exit socket %d %s", circuit_id, additional_info)
if self.settings.remove_tunnel_delay == 0 or remove_now:
remove_exit_socket_info()
elif not self.is_pending_task_active("remove_exit_socket_%s" % circuit_id):
self.register_task("remove_exit_socket_%s" % circuit_id,
reactor.callLater(self.settings.remove_tunnel_delay, remove_exit_socket_info))

def destroy_circuit(self, circuit, reason=0):
sock_addr = circuit.sock_addr
Expand Down Expand Up @@ -863,7 +887,7 @@ def on_extend(self, source_address, data, _):
if not extend_candidate:
extend_candidate = Peer(payload.node_public_key, payload.node_addr)
self.network.add_verified_peer(extend_candidate)
extend_candidate_mid = extend_candidate.mid.encode('hex')
extend_candidate_mid = extend_candidate.mid

self.logger.info("on_extend send CREATE for circuit (%s, %d) to %s:%d", source_address,
circuit_id,
Expand Down Expand Up @@ -943,8 +967,8 @@ def on_data(self, sock_addr, packet):
circuit.beat_heart()
self.increase_bytes_received(circuit, len(packet))

if DataChecker.could_be_dispersy(data):
self.logger.debug("Giving incoming data packet to dispersy")
if DataChecker.could_be_ipv8(data):
self.logger.debug("Giving incoming data packet to IPv8")
self.logger.debug("CIRCUIT ID = %d", circuit_id)
self.on_packet((origin, data[4:]), circuit_id=u"circuit_%d" % circuit_id)
else:
Expand Down Expand Up @@ -1005,7 +1029,7 @@ def on_destroy(self, source_address, data):
self.remove_circuit(circuit_id, "Got destroy")

def exit_data(self, circuit_id, sock_addr, destination, data):
if not self.become_exitnode() and not DataChecker.could_be_dispersy(data):
if not self.become_exitnode() and not DataChecker.could_be_ipv8(data):
self.logger.error("Dropping data packets, refusing to be an exit node for data")

elif circuit_id in self.exit_sockets:
Expand Down
4 changes: 2 additions & 2 deletions ipv8/messaging/anonymization/hidden_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ def tunnel_data(self, circuit, destination, message_type, payload):
pre = destination
self.send_data([circuit.sock_addr], circuit.circuit_id, pre, post, TUNNEL_PREFIX + packet)

def remove_circuit(self, circuit_id, additional_info='', destroy=False):
super(HiddenTunnelCommunity, self).remove_circuit(circuit_id, additional_info, destroy)
def remove_circuit(self, circuit_id, additional_info='', remove_now=False, destroy=False):
super(HiddenTunnelCommunity, self).remove_circuit(circuit_id, additional_info, remove_now, destroy)

circuit = self.my_intro_points.pop(circuit_id, None)
if circuit:
Expand Down
6 changes: 3 additions & 3 deletions ipv8/messaging/anonymization/tunnel.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
CIRCUIT_TYPE_DATA = 'DATA'

# The other circuits are supposed to end in a connectable node, not allowed to exit
# anything else than dispersy messages, used for setting up end-to-end circuits
# anything else than IPv8 messages, used for setting up end-to-end circuits
CIRCUIT_TYPE_IP = 'IP'
CIRCUIT_TYPE_RP = 'RP'
CIRCUIT_TYPE_RENDEZVOUS = 'RENDEZVOUS'
Expand Down Expand Up @@ -72,15 +72,15 @@ def could_be_dht(data):
return False

@staticmethod
def could_be_dispersy(data):
def could_be_ipv8(data):
return data[:4] == "ffffffff".decode("HEX") and len(data) >= (23 + 4)

@staticmethod
def is_allowed(data):
return (DataChecker.could_be_utp(data) or
DataChecker.could_be_udp_tracker(data) or
DataChecker.could_be_dht(data) or
DataChecker.could_be_dispersy(data))
DataChecker.could_be_ipv8(data))


class Tunnel(object):
Expand Down
1 change: 1 addition & 0 deletions test/messaging/anonymization/test_community.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def create_node(self):
settings.become_exitnode = False
settings.min_circuits = 0
settings.max_circuits = 0
settings.remove_tunnel_delay = 0
ipv8 = MockIPv8(u"curve25519", TunnelCommunity, settings=settings)
# Then kill all automated circuit creation
ipv8.overlay.cancel_all_pending_tasks()
Expand Down