-
Notifications
You must be signed in to change notification settings - Fork 1.3k
/
Copy pathcall_python_test.py
104 lines (89 loc) · 3.67 KB
/
call_python_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
"""Tests the `call_python_client` CLI and `call_python_server_test`
together."""
from contextlib import contextmanager
import os
import signal
import subprocess
import time
import unittest
@contextmanager
def scoped_file(filepath, is_fifo=False):
# Ensures a file does not exist, creates it, and then destroys it upon
# exiting the context.
assert not os.path.exists(filepath)
try:
if is_fifo:
os.mkfifo(filepath)
else:
with open(filepath, 'w'):
pass
yield
finally:
os.unlink(filepath)
assert "TEST_TMPDIR" in os.environ, "Must run under `bazel test`"
# Set backend so that the test does not open windows.
os.environ["MPLBACKEND"] = "ps"
SIGPIPE_STATUS = 141
cur_dir = os.path.dirname(os.path.abspath(__file__))
# N.B. Need parent directories because this is under `test/*.py`, but the
# Bazel-generated script is one level above.
server_bin = os.path.join(cur_dir, "../call_python_server_test")
client_bin = os.path.join(cur_dir, "../call_python_client_cli")
file = os.path.join(os.environ["TEST_TMPDIR"], "python_rpc")
done_file = file + "_done"
def wait_for_done_count(num_expected, attempt_max=1000):
done_count = -1
attempt = 0
values_read = set()
while done_count < num_expected:
assert done_count <= num_expected
with open(done_file) as f:
done_count = int(f.read().strip())
if done_count >= 0:
values_read.add(done_count)
time.sleep(0.005)
attempt += 1
if attempt == attempt_max:
raise RuntimeError(
"Did not get updated 'done count'. Read values: {}"
.format(values_read))
class TestCallPython(unittest.TestCase):
def run_server_and_client(self, with_error):
"""Runs and tests server and client in parallel."""
server_flags = ["--file=" + file, "--done_file=" + done_file]
client_flags = ["--file=" + file]
if with_error:
server_flags += ["--with_error"]
client_flags += ["--stop_on_error"]
with scoped_file(file, is_fifo=True), scoped_file(done_file):
with open(done_file, 'w') as f:
f.write("0\n")
# Start client.
client = subprocess.Popen([client_bin] + client_flags)
# Start server.
server = subprocess.Popen([server_bin] + server_flags)
# Join with processes, check return codes.
server_valid_statuses = [0]
if with_error:
# If the C++ binary has not finished by the time the Python
# client exits due to failure, then the C++ binary will fail
# with SIGPIPE.
server_valid_statuses.append(SIGPIPE_STATUS)
self.assertIn(server.wait(), server_valid_statuses)
if not with_error:
# Execute once more.
server = subprocess.Popen([server_bin] + server_flags)
self.assertIn(server.wait(), server_valid_statuses)
# Wait until the client has indicated that the server process
# has run twice. We want to run twice to ensure that looping on
# the client end runs correctly.
wait_for_done_count(2)
client.send_signal(signal.SIGINT)
client_status = client.wait()
self.assertEqual(client_status, int(with_error))
def test_basic(self):
for with_error in [False, True]:
print("[ with_error: {} ]".format(with_error))
self.run_server_and_client(with_error)
# TODO(eric.cousineau): Cover other use cases if it's useful, or prune
# them from the code.