-
Notifications
You must be signed in to change notification settings - Fork 0
/
bird_multiprocess.py
136 lines (100 loc) · 3.66 KB
/
bird_multiprocess.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import multiprocessing as mp
import time
import uuid
import threading
import logging
from typing import Callable, Any
class RpcContext(object):
def __init__(self):
# 身份信息 master/slave
self.my_role: str = None
# 入参 (仅当my_role=slave)时有效
self.args = None
# 连接对点的PIPE
self.pipe_to_another = None
# 会话id (父子同享)
self.session_id: str = None
# 已启动的rpc_callback
self.thread_rpc: threading.Thread = None
# 供RPC线程检查,是否应该退出
self.rpc_should_quit: bool = False
def call_func_args(self, cmd_name, args):
cmd = CustomCmd(cmd_name, args)
self.call_cmd(cmd)
def call_cmd(self, cmd):
self.pipe_to_another.send(cmd)
def call_cmd_quit(self):
logging.info("send cmd QUIT to %s", self.session_id)
self.stop_rpc_callback()
self.call_cmd(SysCmd_Quit())
def start_rpc_function_callback(self, rpc_callback):
self.rpc_should_quit = False
self.thread_rpc = threading.Thread(target=rpc_dispatch_proc, args=(self, None, rpc_callback)).start()
def start_rpc_class_callback(self, class_instance, class_callback):
self.rpc_should_quit = False
self.thread_rpc = threading.Thread(target=rpc_dispatch_proc, args=(self,class_instance, class_callback)).start()
def stop_rpc_callback(self):
if self.thread_rpc:
self.rpc_should_quit = True
self.thread_rpc.join(timeout=3)
self.thread_rpc = None
class MasterControl(object):
global_session_id :int = 0
def __init__(self):
pass
def launch(self, proc, args) -> RpcContext:
"""
启动子进程
:param proc: 见示例slave_main_demo
:param args: 出现在在ctx.args
"""
pipe_parent, pipe_child = mp.Pipe()
MasterControl.global_session_id += 1
session_id = 'proc' + str(MasterControl.global_session_id)
slave_ctx = RpcContext()
slave_ctx.my_role = 'slave'
slave_ctx.pipe_to_another = pipe_child
slave_ctx.args = args
slave_ctx.session_id = session_id
p = mp.Process(target=proc, args=(slave_ctx,))
p.start()
master_context = RpcContext()
master_context.my_role = 'master'
master_context.pipe_to_another = pipe_parent
master_context.session_id = session_id
return master_context
class ProcessGroup(object):
def __init__(self):
pass
class SysCmd_Quit(object):
def __init__(self):
pass
def __str__(self):
return 'SysCmd_Quit'
class CustomCmd(object):
def __init__(self, cmd_name:str =None, args=None):
self.cmd_name = cmd_name
self.args = args
def rpc_dispatch_proc(ctx: RpcContext, class_instance, callback):
while True:
has_data = ctx.pipe_to_another.poll(1)
if ctx.rpc_should_quit:
return
elif not has_data:
continue
try:
cmd = ctx.pipe_to_another.recv()
except EOFError as e:
logging.info('%s %s, rpc pipe abort because: %s', ctx.my_role, ctx.session_id, 'EOFError')
return
try:
if class_instance == None:
callback(cmd)
else:
callback(class_instance, cmd)
except Exception as e:
logging.info('%s %s, encounter an exception %s', ctx.my_role, ctx.session_id, e)
if isinstance(cmd, SysCmd_Quit):
ctx.rpc_should_quit = True
logging.info('%s %s, recv SysCmd_Quit, will quit', ctx.my_role, ctx.session_id)
break