-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsockets.py
214 lines (173 loc) · 7.3 KB
/
sockets.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
#!/usr/bin/env python3
"""
A client Socket and socket thread that can be used for network communication.
"""
import threading
import socket
import Queue
import errno
class MySocket(object):
"""Standard python socket methods for sending and receiving data.
The socket runs in blocking mode with a timer of 02 second.
Blocking mode ensures that thread uses this socket suck 100% cpu circle
because the thread will be suspened by the OS if there is no data to
receive from the socket receive buffer and as such will yield cpu.
The timer ensures performance because it will wake up the thread
so that it can start to handle other jobs like sending
"""
def __init__(self, sock=None):
if sock is None:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
else:
self.sock = sock
self.sock.settimeout(0.2)
def connect(self, host, port):
self.sock.connect((host, port))
def close(self):
self.sock.close()
def send_data(self, msg):
totalsent = 0
while totalsent < len(msg):
sent = self.sock.send(msg[totalsent:])
if sent == 0:
raise RuntimeError("Socket connection broken!")
totalsent = totalsent + sent
def receive_data(self, msg_length=4):
"""Receive the given message length from socket.
Return None if no message can be received. The actual message
received should never be shorter than the given 'msg_length'.
:param msg_length: Expected message length as an integer.
"""
chunks = []
bytes_recd = 0
while bytes_recd < msg_length:
try:
chunk = self.sock.recv(min(msg_length - bytes_recd, 2048))
except socket.timeout: # There is no data to receive.
return None
except socket.error as e:
if e.errno == errno.ECONNRESET:
# Connection reset by peer
pass
elif e.errno == errno.EBADF:
# Bad file descriptor
pass
else:
raise
else:
if chunk == '':
raise RuntimeError("Socket connection closed by server!")
chunks.append(chunk)
bytes_recd = bytes_recd + len(chunk)
return ''.join(chunks)
class SocketMethods(object):
"""Methods used by the socket thread."""
def __init__(self, socket):
self.socket = socket
self.send_queue = Queue.Queue()
self.receive_queue = Queue.Queue()
def get_header(self):
"""Return message header from the socket."""
return self.socket.receive_data(msg_length=self.header_size)
def sendfilter(self, msg):
"""Override this method to filter a message placed
in the send queue before it is sent."""
return msg
def receivefilter(self, msg):
"""Override this method to filter a received message
before it is placed in the receive queue. If this method
returns ``None``, nothing is placed in the receive queue."""
return msg
def get_msg_length(self, header):
"""Override to extract the total message length from a header,
should return the total message length, including the header as
an int.
:param header: The header with length self.header_size
"""
return self.msg_length
def handle_socket(self):
"""Handle data on the socket.
Check the input queue for messages to send and
also listen for incoming messages to receive.
If a header size is given, it is returned first and then
the msg length is extracted with the get_msg_length method.
"""
try:
msg = self.send_queue.get_nowait()
self.socket.send_data(self.sendfilter(msg))
except Queue.Empty:
if self.header_size is not None:
header = self.get_header()
if header:
msg_length = self.get_msg_length(header)
msg = header + self.socket.receive_data(msg_length-len(header))
if self.receivefilter(msg) is not None:
self.receive_queue.put(self.receivefilter(msg))
elif self.msg_length is not None:
msg = self.socket.receive_data(self.msg_length)
if self.receivefilter(msg) is not None:
self.receive_queue.put(self.receivefilter(msg))
class SocketThread(threading.Thread, SocketMethods):
"""Continously keep track of messages on a socket.
When new data have been put in the 'send_queue', the data is fetched from
the queue and sent via the socket to the server. Likewise, if new data is
found on the socket, it is fetched and put in the 'receive_queue' where it
can be accessed from other threads.
To filter the incoming message, override the ``receivefilter`` method;
likewise, to filter the outgoing message, override the ``sendfilter`` method.
If the total size of the message is given in a header, the ``get_msg_length``
method must also be overridden.
:param address: A tuple with ip-address (string) and port number (int).
:param send_queue: An instance of the Queue.Queue class for items to send.
:param receive_queue: An instance of the Queue.Queue class for items
received.
:param header_size: size of header to fetch.
:param msg_length: total size of message (used when the message size is fixed)
:param socket: Socket to use; a new one is created if none is given.
"""
def __init__(self, *args, **kwargs):
super(SocketThread, self).__init__()
self.name = 'SocketThread'
self.stop_run = False
self.msg_length = kwargs.pop('msg_length', None)
self.header_size = kwargs.pop('header_size', None)
self.send_queue = kwargs.pop('send_queue')
self.receive_queue = kwargs.pop('receive_queue')
self.exceptions = kwargs.pop('exceptions', None)
self.socket = kwargs.pop('socket', None)
if self.socket is None:
self.socket = MySocket()
host, port = kwargs.pop('address')
self.socket.connect(host, port)
else:
self.socket = MySocket(self.socket)
def stop(self):
self.socket.close()
self.stop_run = True
def run(self):
while not self.stop_run:
try:
self.handle_socket()
except (TypeError, RuntimeError) as e:
if self.exceptions is None:
raise
else:
self.exceptions.put(e)
self.socket.close()
self.stop_run = True
class SessionId(object):
# required fixed32 value = 1;
value = None
def __init__(self):
self.value = 0
def default(self, o):
return o.__dict__
def fromJSON(self, json):
if json:
self.value = json['value']
def toJSON(self):
jsonData = {}
jsonData['value'] = ""
if self.value or self.value is 0 or self.value is "" or self.value is False:
jsonData['value'] = self.value
return jsonData