-
Notifications
You must be signed in to change notification settings - Fork 8
/
__init__.py
135 lines (115 loc) · 3.92 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import threading
import socket
from . import protocol as p
from typing import Any, Callable
from .errors import ProtocolError, ServerError
from .const import StatusCode, Versions
__all__ = ["Server", "Client", "Versions", "ProtocolError", "ServerError"]
class Client:
def __init__(self, host: str, port: int):
self.family = socket.AF_INET
self.host = host
self.port = port
self.path = None
@classmethod
def from_unix(cls, path: str):
ins = cls('', 0)
ins.family = socket.AF_UNIX
ins.path = path
return ins
@classmethod
def from_inet(cls, host: str, port: int):
return cls(host, port)
def request(self, method: str, payload: Any = None) -> Any:
if self.path is not None:
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.connect(self.path)
else:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((self.host, self.port))
try:
p.write_request(s, method, payload)
code, result = p.read_response(s)
if code == StatusCode.GOOD_RESPONSE:
return result
else:
raise ServerError(result)
finally:
s.close()
class Server:
def __init__(self, host: str, port: int, delegate: Callable[[str, Any], Any]):
self.family = socket.AF_INET
self.host = host
self.port = port
self.path = None
self.started = False
self.lock = threading.Lock()
self.socket = None
self.delegate = delegate
@classmethod
def from_unix(cls, path: str, delegate: Callable[[str, Any], Any]):
ins = cls('', 0, delegate)
ins.family = socket.AF_UNIX
ins.path = path
return ins
@classmethod
def from_inet(cls, host: str, port: int, delegate: Callable[[str, Any], Any]):
return cls(host, port, delegate)
def start(self, daemon=True):
self.lock.acquire()
try:
if self.started:
return
if self.path is not None:
self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind(self.path)
else:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind((self.host, self.port))
self.socket.listen(0)
t = threading.Thread(target=_start_server_listener, args=(self,), daemon=daemon)
t.start()
self.started = True
except OSError as err:
self.socket.close()
raise err
finally:
self.lock.release()
def stop(self):
self.lock.acquire()
try:
if not self.started:
return
self.socket.close()
self.socket = None
self.started = False
finally:
self.lock.release()
def _start_server_listener(server: Server):
try:
while True:
server.lock.acquire()
if not server.started:
return
server.lock.release()
# assert started == true:
conn, _ = server.socket.accept()
t = threading.Thread(target=_start_connection, args=(server, conn,), daemon=True)
t.start()
except ConnectionAbortedError:
# socket stopped
pass
def _start_connection(server: Server, s: socket.socket):
try:
method, payload = p.read_request(s)
try:
result = server.delegate(method, payload)
p.write_good_response(s, result)
except Exception as err:
p.write_bad_response(s, str(err))
except Exception as err:
pass
finally:
s.close()