-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathdmlc_local.py
executable file
·101 lines (89 loc) · 2.99 KB
/
dmlc_local.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
#!/usr/bin/env python
"""
DMLC submission script, local machine version
"""
import argparse
import sys
import os
import subprocess
from threading import Thread
import tracker
import signal
import logging
keepalive = """
nrep=0
rc=254
while [ $rc -eq 254 ];
do
export DMLC_NUM_ATTEMPT=$nrep
%s
rc=$?;
nrep=$((nrep+1));
done
exit $rc
"""
class LocalLauncher(object):
def __init__(self, args, unknown):
self.args = args
self.cmd = ' '.join(args.command) + ' ' + ' '.join(unknown)
def exec_cmd(self, cmd, role, pass_env):
env = os.environ.copy()
for k, v in pass_env.items():
env[k] = str(v)
env['DMLC_ROLE'] = role
ntrial = 0
while True:
if os.name == 'nt':
env['DMLC_NUM_ATTEMPT'] = str(ntrial)
ret = subprocess.call(cmd, shell=True, env = env)
if ret == 254:
ntrial += 1
continue
else:
bash = keepalive % (cmd)
ret = subprocess.call(bash, shell=True, executable='bash', env = env)
if ret == 0:
logging.debug('Thread %d exit with 0')
return
else:
if os.name == 'nt':
os.exit(-1)
else:
raise Exception('Get nonzero return code=%d' % ret)
def submit(self):
def mthread_submit(nworker, nserver, envs):
"""
customized submit script
"""
procs = {}
for i in range(nworker + nserver):
role = 'worker' if i < nworker else 'server'
procs[i] = Thread(target = self.exec_cmd, args = (self.cmd, role, envs))
procs[i].setDaemon(True)
procs[i].start()
return mthread_submit
def run(self):
tracker.config_logger(self.args)
tracker.submit(self.args.num_workers,
self.args.num_servers,
fun_submit = self.submit(),
pscmd = self.cmd)
def main():
parser = argparse.ArgumentParser(
description='DMLC script to submit dmlc jobs as local process')
parser.add_argument('-n', '--num-workers', default = 0, type=int,
help = 'number of worker nodes to be launched')
parser.add_argument('-s', '--num-servers', type=int,
help = 'number of server nodes to be launched')
parser.add_argument('--log-level', default='INFO', type=str,
choices=['INFO', 'DEBUG'],
help = 'logging level')
parser.add_argument('--log-file', type=str,
help = 'output log to the specific log file')
parser.add_argument('command', nargs='+',
help = 'command for launching the program')
args, unknown = parser.parse_known_args()
launcher = LocalLauncher(args, unknown)
launcher.run()
if __name__ == '__main__':
main()