diff --git a/ipv8/messaging/anonymization/community.py b/ipv8/messaging/anonymization/community.py index 737f55e2f..19e4b6069 100644 --- a/ipv8/messaging/anonymization/community.py +++ b/ipv8/messaging/anonymization/community.py @@ -67,6 +67,10 @@ def __init__(self): self.max_packets_without_reply = 50 self.dht_lookup_interval = 30 + # We have a small delay when removing circuits/relays/exit nodes. This is to allow some post-mortem data + # to flow over the circuit (i.e. bandwidth payouts to intermediate nodes in a circuit). + self.remove_tunnel_delay = 5 + self.become_exitnode = False @@ -180,11 +184,11 @@ def become_exitnode(self): def unload(self): # Remove all circuits/relays/exitsockets for circuit_id in self.circuits.keys(): - self.remove_circuit(circuit_id, 'unload', destroy=True) + self.remove_circuit(circuit_id, 'unload', remove_now=True, destroy=True) for circuit_id in self.relay_from_to.keys(): - self.remove_relay(circuit_id, 'unload', destroy=True, both_sides=False) + self.remove_relay(circuit_id, 'unload', remove_now=True, destroy=True, both_sides=False) for circuit_id in self.exit_sockets.keys(): - self.remove_exit_socket(circuit_id, 'unload', destroy=True) + self.remove_exit_socket(circuit_id, 'unload', remove_now=True, destroy=True) self.request_cache.clear() @@ -309,7 +313,7 @@ def create_circuit(self, goal_hops, ctype=CIRCUIT_TYPE_DATA, callback=None, requ # Finally, construct the Circuit object and send the CREATE message circuit_id = self._generate_circuit_id(first_hop.address) circuit = Circuit(circuit_id, goal_hops, first_hop.address, self, ctype, callback, - required_exit, first_hop.mid.encode('hex'), info_hash) + required_exit, first_hop.mid, info_hash) self.request_cache.add(CircuitRequestCache(self, circuit)) @@ -331,23 +335,31 @@ def create_circuit(self, goal_hops, ctype=CIRCUIT_TYPE_DATA, callback=None, requ return circuit_id - def remove_circuit(self, circuit_id, additional_info='', destroy=False): + def remove_circuit(self, circuit_id, additional_info='', remove_now=False, destroy=False): assert isinstance(circuit_id, (long, int)), type(circuit_id) - circuit = self.circuits.pop(circuit_id, None) - if circuit: - if destroy: - self.destroy_circuit(circuit) + def remove_circuit_info(): + circuit = self.circuits.pop(circuit_id, None) + if circuit: + if destroy: + self.destroy_circuit(circuit) + + self.logger.info("removing circuit %d " + additional_info, circuit_id) - self.logger.info("removing circuit %d " + additional_info, circuit_id) + circuit.destroy() - circuit.destroy() + # Clean up the directions dictionary + if circuit_id in self.directions: + del self.directions[circuit_id] - # Clean up the directions dictionary - if circuit_id in self.directions: - del self.directions[circuit_id] + if self.settings.remove_tunnel_delay == 0 or remove_now: + remove_circuit_info() + elif not self.is_pending_task_active("remove_circuit_%s" % circuit_id): + self.register_task("remove_circuit_%s" % circuit_id, + reactor.callLater(self.settings.remove_tunnel_delay, remove_circuit_info)) - def remove_relay(self, circuit_id, additional_info='', destroy=False, got_destroy_from=None, both_sides=True): + def remove_relay(self, circuit_id, additional_info='', remove_now=False, destroy=False, + got_destroy_from=None, both_sides=True): """ Remove a relay and all information associated with the relay. Return the relays that have been removed. """ @@ -364,41 +376,53 @@ def remove_relay(self, circuit_id, additional_info='', destroy=False, got_destro removed_relays = [] for cid in to_remove: - # Remove the relay - self.logger.info("Removing relay %d %s", cid, additional_info) - relay = self.relay_from_to.pop(cid, None) - if relay: - removed_relays.append(relay) + def remove_relay_info(cid_to_remove): + # Remove the relay + self.logger.info("Removing relay %d %s", cid_to_remove, additional_info) - # Remove old session key - if cid in self.relay_session_keys: - del self.relay_session_keys[cid] + relay = self.relay_from_to.pop(cid_to_remove, None) + if relay: + removed_relays.append(relay) - # Clean directions dictionary - self.directions.pop(cid, None) + # Remove old session key + if cid_to_remove in self.relay_session_keys: + del self.relay_session_keys[cid_to_remove] - return removed_relays + # Clean directions dictionary + self.directions.pop(cid_to_remove, None) + + if self.settings.remove_tunnel_delay == 0 or remove_now: + remove_relay_info(cid) + elif not self.is_pending_task_active("remove_relay_%s" % cid): + self.register_task("remove_relay_%s" % cid, + reactor.callLater(self.settings.remove_tunnel_delay, + lambda cid_copy=cid: remove_relay_info(cid_copy))) - def remove_exit_socket(self, circuit_id, additional_info='', destroy=False): - exit_socket = self.exit_sockets.pop(circuit_id, None) + return removed_relays - if exit_socket: - if destroy: - self.destroy_exit_socket(circuit_id) + def remove_exit_socket(self, circuit_id, additional_info='', remove_now=False, destroy=False): + def remove_exit_socket_info(): + exit_socket = self.exit_sockets.pop(circuit_id, None) + if exit_socket: + if destroy: + self.destroy_exit_socket(circuit_id) - # Close socket - if exit_socket.enabled: - self.logger.info("Removing exit socket %d %s", circuit_id, additional_info) + # Close socket + if exit_socket.enabled: + self.logger.info("Removing exit socket %d %s", circuit_id, additional_info) - def on_exit_socket_closed(_): - # Remove old session key - if circuit_id in self.relay_session_keys: - del self.relay_session_keys[circuit_id] + def on_exit_socket_closed(_): + # Remove old session key + if circuit_id in self.relay_session_keys: + del self.relay_session_keys[circuit_id] - exit_socket.close().addCallback(on_exit_socket_closed) + exit_socket.close().addCallback(on_exit_socket_closed) - else: - self.logger.error("could not remove exit socket %d %s", circuit_id, additional_info) + if self.settings.remove_tunnel_delay == 0 or remove_now: + remove_exit_socket_info() + elif not self.is_pending_task_active("remove_exit_socket_%s" % circuit_id): + self.register_task("remove_exit_socket_%s" % circuit_id, + reactor.callLater(self.settings.remove_tunnel_delay, remove_exit_socket_info)) def destroy_circuit(self, circuit, reason=0): sock_addr = circuit.sock_addr @@ -863,7 +887,7 @@ def on_extend(self, source_address, data, _): if not extend_candidate: extend_candidate = Peer(payload.node_public_key, payload.node_addr) self.network.add_verified_peer(extend_candidate) - extend_candidate_mid = extend_candidate.mid.encode('hex') + extend_candidate_mid = extend_candidate.mid self.logger.info("on_extend send CREATE for circuit (%s, %d) to %s:%d", source_address, circuit_id, @@ -943,8 +967,8 @@ def on_data(self, sock_addr, packet): circuit.beat_heart() self.increase_bytes_received(circuit, len(packet)) - if DataChecker.could_be_dispersy(data): - self.logger.debug("Giving incoming data packet to dispersy") + if DataChecker.could_be_ipv8(data): + self.logger.debug("Giving incoming data packet to IPv8") self.logger.debug("CIRCUIT ID = %d", circuit_id) self.on_packet((origin, data[4:]), circuit_id=u"circuit_%d" % circuit_id) else: @@ -1005,7 +1029,7 @@ def on_destroy(self, source_address, data): self.remove_circuit(circuit_id, "Got destroy") def exit_data(self, circuit_id, sock_addr, destination, data): - if not self.become_exitnode() and not DataChecker.could_be_dispersy(data): + if not self.become_exitnode() and not DataChecker.could_be_ipv8(data): self.logger.error("Dropping data packets, refusing to be an exit node for data") elif circuit_id in self.exit_sockets: diff --git a/ipv8/messaging/anonymization/hidden_services.py b/ipv8/messaging/anonymization/hidden_services.py index e024668c9..ad063b784 100644 --- a/ipv8/messaging/anonymization/hidden_services.py +++ b/ipv8/messaging/anonymization/hidden_services.py @@ -98,8 +98,8 @@ def tunnel_data(self, circuit, destination, message_type, payload): pre = destination self.send_data([circuit.sock_addr], circuit.circuit_id, pre, post, TUNNEL_PREFIX + packet) - def remove_circuit(self, circuit_id, additional_info='', destroy=False): - super(HiddenTunnelCommunity, self).remove_circuit(circuit_id, additional_info, destroy) + def remove_circuit(self, circuit_id, additional_info='', remove_now=False, destroy=False): + super(HiddenTunnelCommunity, self).remove_circuit(circuit_id, additional_info, remove_now, destroy) circuit = self.my_intro_points.pop(circuit_id, None) if circuit: diff --git a/ipv8/messaging/anonymization/tunnel.py b/ipv8/messaging/anonymization/tunnel.py index 936cb81c0..4ea1445f7 100644 --- a/ipv8/messaging/anonymization/tunnel.py +++ b/ipv8/messaging/anonymization/tunnel.py @@ -25,7 +25,7 @@ CIRCUIT_TYPE_DATA = 'DATA' # The other circuits are supposed to end in a connectable node, not allowed to exit -# anything else than dispersy messages, used for setting up end-to-end circuits +# anything else than IPv8 messages, used for setting up end-to-end circuits CIRCUIT_TYPE_IP = 'IP' CIRCUIT_TYPE_RP = 'RP' CIRCUIT_TYPE_RENDEZVOUS = 'RENDEZVOUS' @@ -72,7 +72,7 @@ def could_be_dht(data): return False @staticmethod - def could_be_dispersy(data): + def could_be_ipv8(data): return data[:4] == "ffffffff".decode("HEX") and len(data) >= (23 + 4) @staticmethod @@ -80,7 +80,7 @@ def is_allowed(data): return (DataChecker.could_be_utp(data) or DataChecker.could_be_udp_tracker(data) or DataChecker.could_be_dht(data) or - DataChecker.could_be_dispersy(data)) + DataChecker.could_be_ipv8(data)) class Tunnel(object): diff --git a/test/messaging/anonymization/test_community.py b/test/messaging/anonymization/test_community.py index 515be0ed7..8fecf295c 100644 --- a/test/messaging/anonymization/test_community.py +++ b/test/messaging/anonymization/test_community.py @@ -28,6 +28,7 @@ def create_node(self): settings.become_exitnode = False settings.min_circuits = 0 settings.max_circuits = 0 + settings.remove_tunnel_delay = 0 ipv8 = MockIPv8(u"curve25519", TunnelCommunity, settings=settings) # Then kill all automated circuit creation ipv8.overlay.cancel_all_pending_tasks()