-
Notifications
You must be signed in to change notification settings - Fork 0
/
USocket.py
61 lines (44 loc) · 1.62 KB
/
USocket.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
from socket import socket, AF_INET, SOCK_DGRAM, inet_aton, inet_ntoa
import time
sockets = {}
network = ('127.0.0.1', 12345)
def bytes_to_addr(bytes):
return inet_ntoa(bytes[:4]), int.from_bytes(bytes[4:8], 'big')
def addr_to_bytes(addr):
return inet_aton(addr[0]) + addr[1].to_bytes(4, 'big')
def get_sendto(id, rate=None):
if rate:
def sendto(data: bytes, addr):
time.sleep(len(data) / rate)
sockets[id].sendto(addr_to_bytes(addr) + data, network)
return sendto
else:
def sendto(data: bytes, addr):
sockets[id].sendto(addr_to_bytes(addr) + data, network)
return sendto
class UnreliableSocket:
def __init__(self, rate=None):
assert rate == None or rate > 0, 'Rate should be positive or None.'
sockets[id(self)] = socket(AF_INET, SOCK_DGRAM)
self.sendto = get_sendto(id(self), rate)
def bind(self, address: (str, int)):
sockets[id(self)].bind(address)
def recvfrom(self, bufsize):
data, frm = sockets[id(self)].recvfrom(bufsize)
addr = bytes_to_addr(data[:8])
if frm == network:
return data[8:], addr
else:
return self.recvfrom(bufsize)
def settimeout(self, value):
sockets[id(self)].settimeout(value)
def gettimeout(self):
return sockets[id(self)].gettimeout()
def setblocking(self, flag):
sockets[id(self)].setblocking(flag)
def getblocking(self):
sockets[id(self)].getblocking()
def getsockname(self):
return sockets[id(self)].getsockname()
def close(self):
sockets[id(self)].close()