Skip to content

Commit

Permalink
Merge pull request #126 from ichorid/master
Browse files Browse the repository at this point in the history
Reimplement UDP endpoint listener with Twisted DatagramProtocol
  • Loading branch information
qstokkink authored May 9, 2018
2 parents 7af1c6e + fbcbdc2 commit 56f0d1f
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 194 deletions.
4 changes: 2 additions & 2 deletions ipv8/messaging/interfaces/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def open(self):
pass

@abc.abstractmethod
def close(self, timeout=0.0):
def close(self):
pass


Expand All @@ -87,7 +87,7 @@ class EndpointListener(object):

__metaclass__ = abc.ABCMeta

def __init__(self, endpoint, main_thread=False):
def __init__(self, endpoint, main_thread=True):
"""
Create a new listener.
Expand Down
213 changes: 23 additions & 190 deletions ipv8/messaging/interfaces/udp/endpoint.py
Original file line number Diff line number Diff line change
@@ -1,223 +1,56 @@
import errno
from select import select
import socket
import sys
import threading
from time import time

from ..endpoint import DataTooBigException, Endpoint, EndpointClosedException, IllegalDestination

if sys.platform == 'win32':
SOCKET_BLOCK_ERRORCODE = 10035 # WSAEWOULDBLOCK
else:
SOCKET_BLOCK_ERRORCODE = errno.EWOULDBLOCK
from twisted.internet import protocol, reactor, error
from ..endpoint import Endpoint, EndpointClosedException

UDP_MAX_SIZE = 2 ** 16 - 60


class UDPEndpoint(Endpoint):
"""
UDP implementation for sending messages over the Internet.
"""
class UDPEndpoint(Endpoint, protocol.DatagramProtocol):

def __init__(self, port, ip="0.0.0.0"):
"""
Bind an interface to the WWW on a certain port (and IP).
:param port: the port to use
:type port: int
:param ip: the interface to bind to
:type ip: string
"""
super(UDPEndpoint, self).__init__()

self._port = port
self._ip = ip

self._thread = None
self._socket = None

self._sendqueue_lock = threading.RLock()
self._sendqueue = []

self._running = False
self._listening_port = False

def assert_open(self):
"""
Check if the underlying socket is open, or raise an exception.
:raises: EndpointClosedException if the socket is closed
"""
if not self.is_open():
raise EndpointClosedException(self)

def is_open(self):
"""
Check if the underlying socket is open.
"""
return self._socket and self._running

def get_address(self):
"""
Get the address for this Endpoint.
"""
self.assert_open()
return self._socket.getsockname()
def datagramReceived(self, datagram, addr):
self.notify_listeners((addr, datagram))

def send(self, socket_address, packet):
"""
Send a UDP packet to a socket_address.
:param socket_address: the socket_address to send the packet to
:param packet: the packet to send to the socket_address
:return: whether sending was successful
"""
self.assert_open()

if len(packet) > UDP_MAX_SIZE:
raise DataTooBigException(len(packet), UDP_MAX_SIZE)
if socket_address == ('0.0.0.0', 0):
raise IllegalDestination()

try:
self._socket.sendto(packet, socket_address)
except socket.error:
with self._sendqueue_lock:
did_have_senqueue = bool(self._sendqueue)
self._sendqueue.append((time(), socket_address, packet))

# If we did not have a sendqueue, then we need to call process_sendqueue in order send these messages
if not did_have_senqueue:
self._process_sendqueue()

return True
self.transport.write(packet, socket_address)

def open(self):
"""
Open this Endpoint for sending and receiving packets.
"""
for _ in xrange(10000):
try:
self._listening_port = reactor.listenUDP(self._port, self, self._ip, UDP_MAX_SIZE)
self._logger.debug("Listening at %d", self._port)
self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 870400)
self._socket.bind((self._ip, self._port))
self._socket.setblocking(0)

self._port = self._socket.getsockname()[1]
except socket.error:
break
except error.CannotListenError:
self._logger.debug("Listening failed at %d", self._port)
self._port += 1
continue
break

self._running = True
self._thread = threading.Thread(name="UDPEndpoint", target=self._loop)
self._thread.daemon = True
self._thread.start()
return True

def close(self, timeout=10.0):
"""
Close this Endpoint for sending and receiving.
:param timeout: the maximum amount of time to wait for the socket to close
:return: whether closing occurred naturally
"""
self._running = False
result = True

if timeout > 0.0:
self._thread.join(timeout)

if self._thread.is_alive():
self._logger.error("the endpoint thread is still running (after waiting %f seconds)", timeout)
result = False

else:
if self._thread.is_alive():
self._logger.error("the endpoint thread is still running "
"(use timeout > 0.0 to ensure the thread stops)")
result = False

try:
self._socket.close()
except socket.error as exception:
self._logger.exception("%s", exception)
result = False
def assert_open(self):
if not self._running:
raise EndpointClosedException(self)

return result
def close(self):
self._running = False
return self._listening_port.stopListening()

def _loop(self):
def get_address(self):
"""
The loop handling sending and receiving messages over this Endpoint.
Get the address for this Endpoint.
"""
self.assert_open()
return (self._listening_port.getHost().host,
self._listening_port.getHost().port)

recvfrom = self._socket.recvfrom
socket_list = [self._socket.fileno()]

prev_sendqueue = 0
while self._running:
# This is a tricky, if we are running on the DAS4 whenever a socket is ready for writing all processes of
# this node will try to write. Therefore, we have to limit the frequency of trying to write a bit.
if self._sendqueue and (time() - prev_sendqueue) > 0.1:
read_list, write_list, _ = select(socket_list, socket_list, [], 0.1)
else:
read_list, write_list, _ = select(socket_list, [], [], 0.1)

# Furthermore, if we are allowed to send, process sendqueue immediately
if write_list:
self._process_sendqueue()
prev_sendqueue = time()

if read_list:
packets = []
try:
while True:
(data, sock_addr) = recvfrom(65535)
if data:
packets.append((sock_addr, data))
else:
break

except socket.error as e:
if e.errno != errno.EAGAIN:
self._logger.debug('socket error: %s' % repr(e))

finally:
if packets:
for packet in packets:
self.notify_listeners(packet)

def _process_sendqueue(self):
def is_open(self):
"""
Send any outstanding/previously undeliverable messages.
Check if the underlying socket is open.
"""
self.assert_open()

with self._sendqueue_lock:
if self._sendqueue:
index = 0
NUM_PACKETS = min(max(50, len(self._sendqueue) / 10), len(self._sendqueue))
self._logger.debug("%d left in sendqueue, trying to send %d packets",
len(self._sendqueue), NUM_PACKETS)

allowed_timestamp = time() - 300

for i in xrange(NUM_PACKETS):
queued_at, sock_addr, data = self._sendqueue[i]
if queued_at > allowed_timestamp:
try:
self._socket.sendto(data, sock_addr)
index += 1
except socket.error as e:
if e[0] != SOCKET_BLOCK_ERRORCODE:
self._logger.warning("could not send %d to %s (%d in sendqueue)",
len(data), sock_addr, len(self._sendqueue))
break
else:
index += 1

self._sendqueue = self._sendqueue[index:]
if self._sendqueue:
self._logger.debug("%d left in sendqueue", len(self._sendqueue))
return self._listening_port and self._running
2 changes: 1 addition & 1 deletion ipv8/overlay.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def __init__(self, master_peer, my_peer, endpoint, network):
:param endpoint: the endpoint to use for messaging
:param network: the network graph backend
"""
EndpointListener.__init__(self, endpoint, False)
EndpointListener.__init__(self, endpoint)
TaskManager.__init__(self)
self.serializer = self.get_serializer()
self.crypto = ECCrypto()
Expand Down
2 changes: 1 addition & 1 deletion ipv8/test/mocking/exit_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def __init__(self, parent):

TunnelExitSocket.__init__(self, parent.circuit_id, parent.overlay, parent.sock_addr, parent.mid)
parent.close()
EndpointListener.__init__(self, self.endpoint)
EndpointListener.__init__(self, self.endpoint, main_thread=False)

self.endpoint.add_listener(self)

Expand Down
1 change: 1 addition & 0 deletions ipv8/test/mocking/ipv8.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def __init__(self, crypto_curve, overlay_class, *args, **kwargs):
self.network = Network()
self.my_peer = Peer(ECCrypto().generate_key(crypto_curve), self.endpoint.wan_address)
self.overlay = overlay_class(self.my_peer, self.endpoint, self.network, *args, **kwargs)
self.overlay._use_main_thread = False
self.discovery = MockWalk(self.overlay)

self.overlay.my_estimated_wan = self.endpoint.wan_address
Expand Down

0 comments on commit 56f0d1f

Please sign in to comment.