-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
131 lines (106 loc) · 3.57 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import subprocess
import os
import time
import sys
import urllib.request
import urllib.error
# The server for cloning student files. For example, you usually clone from
# the server "https://github.com"
git_server = "https://github.com"
if len(sys.argv) is not 3:
print("usage: python {} project-name users-file.txt".format(sys.argv[0]))
exit(1)
project_name = sys.argv[1]
users_file = sys.argv[2]
with open(users_file) as f:
users = f.read().splitlines()
# Create the reports directories
os.makedirs(os.path.join("reports", project_name, "good"), exist_ok=True)
os.makedirs(os.path.join("reports", project_name, "bad"), exist_ok=True)
for user in users:
# If this is an http git server
if 'http' in git_server:
# Then confirm that the repo exists before trying to clone it
try:
response = urllib.request.urlopen("/".join([git_server, user, project_name]))
except urllib.error.HTTPError:
continue
# The path to this user's homework submission
repo_path = os.path.join('users', user, project_name)
# Remove any existing submission that we've downloaded; we're going to test
# whatever is in the repo right now.
subprocess.call(
[
"rm",
"-rf",
repo_path,
]
)
# Clone down the user's submission into repo_path
result = subprocess.call(
[
"git",
"clone",
"{}/{}/{}".format(git_server, user, project_name),
repo_path,
]
)
subprocess.call(
[
"cp",
os.path.join("tests", project_name, "tests.py"),
repo_path,
]
)
# Did we successfully clone down the project?
clone_successful = result is 0
# If not, then skip this student
if not clone_successful:
continue
time.sleep(3)
# Run the tests, collecting stdout and stderr
stdout = b""
stderr = b""
test_process = subprocess.Popen(
[
"python",
#os.path.join("tests", "tests.py"),
"tests.py"
],
cwd=repo_path,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
try:
# Hope for the best
success = True
try:
# Retrieve stdout and stderr
stdout, stderr = test_process.communicate(timeout=10)
except subprocess.TimeoutExpired:
# If the tests took too long, then kill the process, record tests as failed
test_process.kill()
success = False
if test_process.returncode is None or test_process.returncode is not 0:
# If the tests failed, mark them as failed
success = False
# Build the paths for this student's good/bad reports
good_result_path = os.path.join("reports", project_name, "good", "{}.txt".format(user))
bad_result_path = os.path.join("reports", project_name, "bad", "{}.txt".format(user))
# Reformat stderr and stdout
contents = (stderr.decode().replace("\r\n", "\n") + "\n\n" + stdout.decode().replace("\r\n", "\n"))
# Remove any existing good/bad reports
for file_path in [bad_result_path, good_result_path]:
try:
os.unlink(file_path)
except FileNotFoundError:
pass
if success:
result_path = good_result_path
else:
result_path = bad_result_path
with open(result_path, "w") as f:
f.write(contents)
finally:
# In any case, kill the test process
test_process.kill()