forked from UCL-INGI/INGInious
-
Notifications
You must be signed in to change notification settings - Fork 7
/
inginious-agent-docker
executable file
·110 lines (91 loc) · 4.43 KB
/
inginious-agent-docker
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# This file is part of INGInious. See the LICENSE and the COPYRIGHTS files for
# more information about the licensing of this file.
#
""" Starts an agent """
import argparse
import logging
import os
import multiprocessing
import sys
from zmq.asyncio import ZMQEventLoop, Context
import asyncio
from inginious.common.entrypoints import get_args_and_filesystem
from inginious.agent.docker_agent import DockerAgent
def check_range(value):
value = value.split("-")
if len(value) != 2:
raise argparse.ArgumentTypeError("Port range should be in the form 'begin-end', for example 1000-2000")
try:
begin = int(value[0])
end = int(value[1])
except:
raise argparse.ArgumentTypeError("Port range should be in the form 'begin-end', for example 1000-2000")
if begin > end:
(begin, end) = end, begin
return range(begin, end+1)
def check_negative(value):
try:
ivalue = int(value)
except:
raise argparse.ArgumentTypeError("%s is an invalid positive int value" % value)
if ivalue <= 0:
raise argparse.ArgumentTypeError("%s is an invalid positive int value" % value)
return ivalue
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("backend", help="Address to the backend, in the form protocol://host:port. For example, tcp://127.0.0.1:2000", type=str)
parser.add_argument("--friendly-name", help="Friendly name to help identify agent.", default="", type=str)
parser.add_argument("--debug-host", help="Host used for job remote debugging. Should be an IP or an hostname. If not filled in, "
"it will be automatically guessed", default=None, type=str)
parser.add_argument("--debug-ports", help="Range of port for job remote debugging. By default it is 64120-64130", type=check_range, default="64120-64130")
parser.add_argument("--tmpdir", help="Path to a directory where the agent can store information, such as caches. Defaults to ./agent_data",
default="./agent_data")
parser.add_argument("--concurrency", help="Maximal number of jobs that can run concurrently on this agent. By default, it is the two times the "
"number of cores available.", default=multiprocessing.cpu_count(), type=check_negative)
parser.add_argument("-v", "--verbose", help="increase output verbosity",
action="store_true")
parser.add_argument("--debugmode", help="Enables debug mode. For developers only.", action="store_true")
parser.add_argument("--disable-autorestart", help="Disables the auto restart on agent failure.", action="store_true")
(args, fsprovider) = get_args_and_filesystem(parser)
if not os.path.exists(args.tmpdir):
os.makedirs(args.tmpdir)
# create logger
logger = logging.getLogger("inginious")
logger.setLevel(logging.INFO if not args.verbose else logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.INFO if not args.verbose else logging.DEBUG)
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
ch.setFormatter(formatter)
logger.addHandler(ch)
closing = False
while not closing:
# start asyncio and zmq
loop = ZMQEventLoop()
asyncio.set_event_loop(loop)
if args.debugmode:
loop.set_debug(True)
context = Context()
# Create agent
agent = DockerAgent(context, args.backend, args.friendly_name, args.concurrency, fsprovider, ssh_host=args.debug_host,
ssh_ports=args.debug_ports, tmp_dir=args.tmpdir)
# Run!
try:
loop.run_until_complete(agent.run())
except KeyboardInterrupt:
pass # do not restart in this case
except:
# log the error, and restart if needed
closing = args.disable_autorestart
if closing:
logger.exception("Agent has received an exception forcing it to end")
else:
logger.exception("Agent has received an exception forcing it to restart")
finally:
logger.info("Closing loop")
loop.close()
logger.info("Waiting for ZMQ to send remaining messages to backend (can take 1 sec)")
context.destroy(1000) # give zeromq 1 sec to send remaining messages
logger.info("Done")