diff --git a/ipv8/messaging/interfaces/endpoint.py b/ipv8/messaging/interfaces/endpoint.py index b5bebaf4b..3c44fa32e 100644 --- a/ipv8/messaging/interfaces/endpoint.py +++ b/ipv8/messaging/interfaces/endpoint.py @@ -76,7 +76,7 @@ def open(self): pass @abc.abstractmethod - def close(self, timeout=0.0): + def close(self): pass @@ -87,7 +87,7 @@ class EndpointListener(object): __metaclass__ = abc.ABCMeta - def __init__(self, endpoint, main_thread=False): + def __init__(self, endpoint, main_thread=True): """ Create a new listener. diff --git a/ipv8/messaging/interfaces/udp/endpoint.py b/ipv8/messaging/interfaces/udp/endpoint.py index b88fb4107..b8d8ad577 100644 --- a/ipv8/messaging/interfaces/udp/endpoint.py +++ b/ipv8/messaging/interfaces/udp/endpoint.py @@ -1,223 +1,56 @@ -import errno -from select import select -import socket -import sys -import threading -from time import time - -from ..endpoint import DataTooBigException, Endpoint, EndpointClosedException, IllegalDestination - -if sys.platform == 'win32': - SOCKET_BLOCK_ERRORCODE = 10035 # WSAEWOULDBLOCK -else: - SOCKET_BLOCK_ERRORCODE = errno.EWOULDBLOCK +from twisted.internet import protocol, reactor, error +from ..endpoint import Endpoint, EndpointClosedException UDP_MAX_SIZE = 2 ** 16 - 60 -class UDPEndpoint(Endpoint): - """ - UDP implementation for sending messages over the Internet. - """ +class UDPEndpoint(Endpoint, protocol.DatagramProtocol): def __init__(self, port, ip="0.0.0.0"): - """ - Bind an interface to the WWW on a certain port (and IP). - - :param port: the port to use - :type port: int - :param ip: the interface to bind to - :type ip: string - """ super(UDPEndpoint, self).__init__() - self._port = port self._ip = ip - - self._thread = None - self._socket = None - - self._sendqueue_lock = threading.RLock() - self._sendqueue = [] - self._running = False + self._listening_port = False - def assert_open(self): - """ - Check if the underlying socket is open, or raise an exception. - - :raises: EndpointClosedException if the socket is closed - """ - if not self.is_open(): - raise EndpointClosedException(self) - - def is_open(self): - """ - Check if the underlying socket is open. - """ - return self._socket and self._running - - def get_address(self): - """ - Get the address for this Endpoint. - """ - self.assert_open() - return self._socket.getsockname() + def datagramReceived(self, datagram, addr): + self.notify_listeners((addr, datagram)) def send(self, socket_address, packet): - """ - Send a UDP packet to a socket_address. - - :param socket_address: the socket_address to send the packet to - :param packet: the packet to send to the socket_address - :return: whether sending was successful - """ self.assert_open() - - if len(packet) > UDP_MAX_SIZE: - raise DataTooBigException(len(packet), UDP_MAX_SIZE) - if socket_address == ('0.0.0.0', 0): - raise IllegalDestination() - - try: - self._socket.sendto(packet, socket_address) - except socket.error: - with self._sendqueue_lock: - did_have_senqueue = bool(self._sendqueue) - self._sendqueue.append((time(), socket_address, packet)) - - # If we did not have a sendqueue, then we need to call process_sendqueue in order send these messages - if not did_have_senqueue: - self._process_sendqueue() - - return True + self.transport.write(packet, socket_address) def open(self): - """ - Open this Endpoint for sending and receiving packets. - """ for _ in xrange(10000): try: + self._listening_port = reactor.listenUDP(self._port, self, self._ip, UDP_MAX_SIZE) self._logger.debug("Listening at %d", self._port) - self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 870400) - self._socket.bind((self._ip, self._port)) - self._socket.setblocking(0) - - self._port = self._socket.getsockname()[1] - except socket.error: + break + except error.CannotListenError: self._logger.debug("Listening failed at %d", self._port) self._port += 1 continue - break - self._running = True - self._thread = threading.Thread(name="UDPEndpoint", target=self._loop) - self._thread.daemon = True - self._thread.start() return True - def close(self, timeout=10.0): - """ - Close this Endpoint for sending and receiving. - :param timeout: the maximum amount of time to wait for the socket to close - :return: whether closing occurred naturally - """ - self._running = False - result = True - - if timeout > 0.0: - self._thread.join(timeout) - - if self._thread.is_alive(): - self._logger.error("the endpoint thread is still running (after waiting %f seconds)", timeout) - result = False - - else: - if self._thread.is_alive(): - self._logger.error("the endpoint thread is still running " - "(use timeout > 0.0 to ensure the thread stops)") - result = False - - try: - self._socket.close() - except socket.error as exception: - self._logger.exception("%s", exception) - result = False + def assert_open(self): + if not self._running: + raise EndpointClosedException(self) - return result + def close(self): + self._running = False + return self._listening_port.stopListening() - def _loop(self): + def get_address(self): """ - The loop handling sending and receiving messages over this Endpoint. + Get the address for this Endpoint. """ self.assert_open() + return (self._listening_port.getHost().host, + self._listening_port.getHost().port) - recvfrom = self._socket.recvfrom - socket_list = [self._socket.fileno()] - - prev_sendqueue = 0 - while self._running: - # This is a tricky, if we are running on the DAS4 whenever a socket is ready for writing all processes of - # this node will try to write. Therefore, we have to limit the frequency of trying to write a bit. - if self._sendqueue and (time() - prev_sendqueue) > 0.1: - read_list, write_list, _ = select(socket_list, socket_list, [], 0.1) - else: - read_list, write_list, _ = select(socket_list, [], [], 0.1) - - # Furthermore, if we are allowed to send, process sendqueue immediately - if write_list: - self._process_sendqueue() - prev_sendqueue = time() - - if read_list: - packets = [] - try: - while True: - (data, sock_addr) = recvfrom(65535) - if data: - packets.append((sock_addr, data)) - else: - break - - except socket.error as e: - if e.errno != errno.EAGAIN: - self._logger.debug('socket error: %s' % repr(e)) - - finally: - if packets: - for packet in packets: - self.notify_listeners(packet) - - def _process_sendqueue(self): + def is_open(self): """ - Send any outstanding/previously undeliverable messages. + Check if the underlying socket is open. """ - self.assert_open() - - with self._sendqueue_lock: - if self._sendqueue: - index = 0 - NUM_PACKETS = min(max(50, len(self._sendqueue) / 10), len(self._sendqueue)) - self._logger.debug("%d left in sendqueue, trying to send %d packets", - len(self._sendqueue), NUM_PACKETS) - - allowed_timestamp = time() - 300 - - for i in xrange(NUM_PACKETS): - queued_at, sock_addr, data = self._sendqueue[i] - if queued_at > allowed_timestamp: - try: - self._socket.sendto(data, sock_addr) - index += 1 - except socket.error as e: - if e[0] != SOCKET_BLOCK_ERRORCODE: - self._logger.warning("could not send %d to %s (%d in sendqueue)", - len(data), sock_addr, len(self._sendqueue)) - break - else: - index += 1 - - self._sendqueue = self._sendqueue[index:] - if self._sendqueue: - self._logger.debug("%d left in sendqueue", len(self._sendqueue)) + return self._listening_port and self._running diff --git a/ipv8/overlay.py b/ipv8/overlay.py index b704b587c..cae8e5e28 100644 --- a/ipv8/overlay.py +++ b/ipv8/overlay.py @@ -25,7 +25,7 @@ def __init__(self, master_peer, my_peer, endpoint, network): :param endpoint: the endpoint to use for messaging :param network: the network graph backend """ - EndpointListener.__init__(self, endpoint, False) + EndpointListener.__init__(self, endpoint) TaskManager.__init__(self) self.serializer = self.get_serializer() self.crypto = ECCrypto() diff --git a/ipv8/test/mocking/exit_socket.py b/ipv8/test/mocking/exit_socket.py index 7da8db7bb..34ac6b399 100644 --- a/ipv8/test/mocking/exit_socket.py +++ b/ipv8/test/mocking/exit_socket.py @@ -15,7 +15,7 @@ def __init__(self, parent): TunnelExitSocket.__init__(self, parent.circuit_id, parent.overlay, parent.sock_addr, parent.mid) parent.close() - EndpointListener.__init__(self, self.endpoint) + EndpointListener.__init__(self, self.endpoint, main_thread=False) self.endpoint.add_listener(self) diff --git a/ipv8/test/mocking/ipv8.py b/ipv8/test/mocking/ipv8.py index b557d7851..fd326308f 100644 --- a/ipv8/test/mocking/ipv8.py +++ b/ipv8/test/mocking/ipv8.py @@ -15,6 +15,7 @@ def __init__(self, crypto_curve, overlay_class, *args, **kwargs): self.network = Network() self.my_peer = Peer(ECCrypto().generate_key(crypto_curve), self.endpoint.wan_address) self.overlay = overlay_class(self.my_peer, self.endpoint, self.network, *args, **kwargs) + self.overlay._use_main_thread = False self.discovery = MockWalk(self.overlay) self.overlay.my_estimated_wan = self.endpoint.wan_address