From 987b21cec8ffb3d8caa406eee296c269930448c2 Mon Sep 17 00:00:00 2001 From: Martijn de Vos Date: Sun, 4 Mar 2018 13:25:41 +0100 Subject: [PATCH] Added optional delay when removing tunnels --- ipv8/messaging/anonymization/community.py | 102 +++++++++++------- .../messaging/anonymization/test_community.py | 1 + 2 files changed, 65 insertions(+), 38 deletions(-) diff --git a/ipv8/messaging/anonymization/community.py b/ipv8/messaging/anonymization/community.py index 7e78c8479..048d760e0 100644 --- a/ipv8/messaging/anonymization/community.py +++ b/ipv8/messaging/anonymization/community.py @@ -67,6 +67,10 @@ def __init__(self): self.max_packets_without_reply = 50 self.dht_lookup_interval = 30 + # We have a small delay when removing circuits/relays/exit nodes. This is to allow some post-mortem data + # to flow over the circuit (i.e. bandwidth payouts to intermediate nodes in a circuit). + self.remove_tunnel_delay = 5 + self.become_exitnode = False @@ -334,18 +338,26 @@ def create_circuit(self, goal_hops, ctype=CIRCUIT_TYPE_DATA, callback=None, requ def remove_circuit(self, circuit_id, additional_info='', destroy=False): assert isinstance(circuit_id, (long, int)), type(circuit_id) - circuit = self.circuits.pop(circuit_id, None) - if circuit: - if destroy: - self.destroy_circuit(circuit) + def remove_circuit_info(): + circuit = self.circuits.pop(circuit_id, None) + if circuit: + if destroy: + self.destroy_circuit(circuit) + + self.logger.info("removing circuit %d " + additional_info, circuit_id) - self.logger.info("removing circuit %d " + additional_info, circuit_id) + circuit.destroy() - circuit.destroy() + # Clean up the directions dictionary + if circuit_id in self.directions: + del self.directions[circuit_id] - # Clean up the directions dictionary - if circuit_id in self.directions: - del self.directions[circuit_id] + if not self.is_pending_task_active("remove_circuit_%s" % circuit_id): + if self.settings.remove_tunnel_delay == 0: + remove_circuit_info() + else: + self.register_task("remove_circuit_%s" % circuit_id, + reactor.callLater(self.settings.remove_tunnel_delay, remove_circuit_info)) def remove_relay(self, circuit_id, additional_info='', destroy=False, got_destroy_from=None, both_sides=True): """ @@ -364,41 +376,55 @@ def remove_relay(self, circuit_id, additional_info='', destroy=False, got_destro removed_relays = [] for cid in to_remove: - # Remove the relay - self.logger.info("Removing relay %d %s", cid, additional_info) - relay = self.relay_from_to.pop(cid, None) - if relay: - removed_relays.append(relay) - - # Remove old session key - if cid in self.relay_session_keys: - del self.relay_session_keys[cid] + def remove_relay_info(cid_to_remove): + # Remove the relay + self.logger.info("Removing relay %d %s", cid_to_remove, additional_info) - # Clean directions dictionary - self.directions.pop(cid, None) + relay = self.relay_from_to.pop(cid_to_remove, None) + if relay: + removed_relays.append(relay) - return removed_relays + # Remove old session key + if cid_to_remove in self.relay_session_keys: + del self.relay_session_keys[cid_to_remove] - def remove_exit_socket(self, circuit_id, additional_info='', destroy=False): - exit_socket = self.exit_sockets.pop(circuit_id, None) + # Clean directions dictionary + self.directions.pop(cid_to_remove, None) - if exit_socket: - if destroy: - self.destroy_exit_socket(circuit_id) - - # Close socket - if exit_socket.enabled: - self.logger.info("Removing exit socket %d %s", circuit_id, additional_info) - - def on_exit_socket_closed(_): - # Remove old session key - if circuit_id in self.relay_session_keys: - del self.relay_session_keys[circuit_id] + if not self.is_pending_task_active("remove_relay_%s" % cid): + if self.settings.remove_tunnel_delay == 0: + remove_relay_info(cid) + else: + self.register_task("remove_relay_%s" % cid, + reactor.callLater(self.settings.remove_tunnel_delay, + lambda cid_copy=cid:remove_relay_info(cid_copy))) - exit_socket.close().addCallback(on_exit_socket_closed) + return removed_relays - else: - self.logger.error("could not remove exit socket %d %s", circuit_id, additional_info) + def remove_exit_socket(self, circuit_id, additional_info='', destroy=False): + def remove_exit_socket_info(): + exit_socket = self.exit_sockets.pop(circuit_id, None) + if exit_socket: + if destroy: + self.destroy_exit_socket(circuit_id) + + # Close socket + if exit_socket.enabled: + self.logger.info("Removing exit socket %d %s", circuit_id, additional_info) + + def on_exit_socket_closed(_): + # Remove old session key + if circuit_id in self.relay_session_keys: + del self.relay_session_keys[circuit_id] + + exit_socket.close().addCallback(on_exit_socket_closed) + + if not self.is_pending_task_active("remove_exit_socket_%s" % circuit_id): + if self.settings.remove_tunnel_delay == 0: + remove_exit_socket_info() + else: + self.register_task("remove_exit_socket_%s" % circuit_id, + reactor.callLater(self.settings.remove_tunnel_delay, remove_exit_socket_info)) def destroy_circuit(self, circuit, reason=0): sock_addr = circuit.sock_addr diff --git a/test/messaging/anonymization/test_community.py b/test/messaging/anonymization/test_community.py index 515be0ed7..8fecf295c 100644 --- a/test/messaging/anonymization/test_community.py +++ b/test/messaging/anonymization/test_community.py @@ -28,6 +28,7 @@ def create_node(self): settings.become_exitnode = False settings.min_circuits = 0 settings.max_circuits = 0 + settings.remove_tunnel_delay = 0 ipv8 = MockIPv8(u"curve25519", TunnelCommunity, settings=settings) # Then kill all automated circuit creation ipv8.overlay.cancel_all_pending_tasks()