-
Notifications
You must be signed in to change notification settings - Fork 65
/
invoke_process.py
144 lines (127 loc) · 4.98 KB
/
invoke_process.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# -*- coding: utf-8 -*-
"""
invoke_process.py
launch multiple subprocesses running SmallfileWorkload instance
Copyright 2012 -- Ben England
Licensed under the Apache License at http://www.apache.org/licenses/LICENSE-2.0
See Appendix on this page for instructions pertaining to license.
"""
import multiprocessing
import os
import shutil
import time
import smallfile
from smallfile import SMFRunException, unittest_module
from sync_files import touch
# this class launches multiple threads with SmallfileWorkload instances
# we do this because we can use > 1 core this way, with python threading,
# it doesn't really use > 1 core because of the GIL (global lock)
# occasional status reports could be sent back using pipe as well
class subprocess(multiprocessing.Process):
def __init__(self, invocation):
multiprocessing.Process.__init__(self)
(conn1, conn2) = multiprocessing.Pipe(False)
self.receiver = conn1 # master process receives test result data here
self.sender = conn2 # slave process sends test result data here
invocation.buf = None
invocation.biggest_buf = None
invocation.log = None
self.invoke = invocation # all workload generated by this object
def run(self):
try:
self.invoke.do_workload()
self.invoke.log.debug(
"exiting subprocess and returning invoke " + str(self.invoke)
)
except Exception as e:
print(
"Exception seen in thread %s host %s (tail %s) "
% (self.invoke.tid, self.invoke.onhost, self.invoke.log_fn())
)
self.invoke.log.error(str(e))
self.status = self.invoke.NOTOK
finally:
self.rsptimes = None # response time array already saved to file
# reduce amount of data returned from this thread
# by eliminating references objects that are no longer needed
self.invoke.log = None # log objects cannot be serialized
self.invoke.buf = None
self.invoke.biggest_buf = None
self.invoke.rsptimes = None
self.invoke.loggers = None
self.invoke.file_dirs = None
self.sender.send(self.invoke)
# below are unit tests for SmallfileWorkload
# including multi-threaded test
# to run, just do "python invoke_process.py"
class Test(unittest_module.TestCase):
def setUp(self):
self.invok = smallfile.SmallfileWorkload()
self.invok.debug = True
self.invok.verbose = True
self.invok.tid = "regtest"
self.invok.start_log()
shutil.rmtree(self.invok.src_dirs[0], ignore_errors=True)
os.makedirs(self.invok.src_dirs[0])
def test_multiproc_stonewall(self):
self.invok.log.info("starting stonewall test")
thread_ready_timeout = 4
thread_count = 4
for tree in self.invok.top_dirs:
shutil.rmtree(tree)
os.mkdir(tree)
for dir in self.invok.src_dirs:
os.mkdir(dir)
for dir in self.invok.dest_dirs:
os.mkdir(dir)
os.mkdir(self.invok.network_dir)
self.invok.starting_gate = os.path.join(self.invok.network_dir, "starting-gate")
sgate_file = self.invok.starting_gate
invokeList = []
for j in range(0, thread_count):
s = smallfile.SmallfileWorkload()
# s.log_to_stderr = True
s.verbose = True
s.tid = str(j)
s.prefix = "thr_"
s.suffix = "foo"
s.iterations = 10
s.stonewall = False
s.starting_gate = sgate_file
invokeList.append(s)
threadList = []
for s in invokeList:
threadList.append(subprocess(s))
for t in threadList:
t.start()
threads_ready = True
for i in range(0, thread_ready_timeout):
threads_ready = True
for s in invokeList:
thread_ready_file = s.gen_thread_ready_fname(s.tid)
if not os.path.exists(thread_ready_file):
threads_ready = False
if threads_ready:
break
time.sleep(1)
if not threads_ready:
raise SMFRunException(
"threads did not show up within %d seconds" % thread_ready_timeout
)
time.sleep(1)
touch(sgate_file)
for t in threadList:
rtnd_invok = t.receiver.recv()
t.join()
self.invok.log.info(str(rtnd_invok))
assert rtnd_invok.elapsed_time is not None
assert rtnd_invok.rq_final is not None
assert rtnd_invok.filenum_final is not None
if rtnd_invok.status != rtnd_invok.OK:
raise SMFRunException(
"subprocess failure for %s invocation %s: "
% (str(t), str(rtnd_invok))
)
# so you can just do "python invoke_process.py" to test it
if __name__ == "__main__":
unittest_module.main()