Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gh-115258: Fix failed tests on threading queue shutdown #115940

Merged
merged 9 commits into from
Mar 18, 2024
132 changes: 71 additions & 61 deletions Lib/test/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,97 +317,107 @@ def test_shutdown_all_methods_in_one_thread(self):
def test_shutdown_immediate_all_methods_in_one_thread(self):
return self._shutdown_all_methods_in_one_thread(True)

def _write_msg_thread(self, q, n, results, delay,
i_when_exec_shutdown,
event_start, event_end):
event_start.wait()
for i in range(1, n+1):
def _write_msg_thread(self, q, n, results,
i_when_exec_shutdown, event_shutdown,
barrier_start):
# All `write_msg_threads`
# put several items into the queue.
for i in range(0, i_when_exec_shutdown//2):
q.put((i, 'LOYD'))
# Wait for the barrier to be complete.
barrier_start.wait()

for i in range(i, n):
YvesDup marked this conversation as resolved.
Show resolved Hide resolved
try:
q.put((i, "YDLO"))
results.append(True)
except self.queue.ShutDown:
results.append(False)
# triggers shutdown of queue
if i == i_when_exec_shutdown:
event_end.set()
time.sleep(delay)
# end of all puts
q.join()
break

def _read_msg_thread(self, q, nb, results, delay, event_start):
event_start.wait()
block = True
while nb:
time.sleep(delay)
# Trigger queue shutdown.
if i == i_when_exec_shutdown:
# Only once thread do it.
YvesDup marked this conversation as resolved.
Show resolved Hide resolved
if not event_shutdown.is_set():
YvesDup marked this conversation as resolved.
Show resolved Hide resolved
event_shutdown.set()
results.append(True)

def _read_msg_thread(self, q, results, barrier_start):
# Get at least one item.
q.get(True)
q.task_done()
# Wait for the barrier to be complete.
barrier_start.wait()
while True:
try:
# Get at least one message
q.get(block)
block = False
q.get(False)
q.task_done()
results.append(True)
nb -= 1
except self.queue.ShutDown:
results.append(False)
nb -= 1
results.append(True)
break
except self.queue.Empty:
pass
q.join()

def _shutdown_thread(self, q, event_end, immediate):
def _shutdown_thread(self, q, results, event_end, immediate):
event_end.wait()
q.shutdown(immediate)
q.join()
results.append(q.qsize() == 0)

def _join_thread(self, q, delay, event_start):
event_start.wait()
time.sleep(delay)
def _join_thread(self, q, barrier_start):
# Wait for the barrier to be complete.
barrier_start.wait()
q.join()

def _shutdown_all_methods_in_many_threads(self, immediate):
# Run a 'multi-producers/consumers queue' use case,
# with enough items into the queue.
# When shutdown, all running threads will be concerned.
YvesDup marked this conversation as resolved.
Show resolved Hide resolved
q = self.type2test()
ps = []
ev_start = threading.Event()
ev_exec_shutdown = threading.Event()
res_puts = []
res_gets = []
delay = 1e-4
read_process = 4
nb_msgs = read_process * 16
nb_msgs_r = nb_msgs // read_process
when_exec_shutdown = nb_msgs // 2
lprocs = (
(self._write_msg_thread, 1, (q, nb_msgs, res_puts, delay,
when_exec_shutdown,
ev_start, ev_exec_shutdown)),
(self._read_msg_thread, read_process, (q, nb_msgs_r,
res_gets, delay*2,
ev_start)),
(self._join_thread, 2, (q, delay*2, ev_start)),
(self._shutdown_thread, 1, (q, ev_exec_shutdown, immediate)),
)
# start all threds
res_shutdown = []
write_threads = 4
read_threads = 6
join_threads = 2
nb_msgs = 1024*64
nb_msgs_w = nb_msgs // write_threads
when_exec_shutdown = nb_msgs_w // 2
# Use of a `threading.Barrier`` to ensure that
# all `_write_msg_threads`put their part of items into the queue
# all `_read_msg_thread` get at least one itme from the queue,
# and keep on running until shutdown.
# The `_join_thread` is started only when shutdown is emmediate.
YvesDup marked this conversation as resolved.
Show resolved Hide resolved
nparties = write_threads + read_threads
if immediate:
nparties += join_threads
barrier_start = threading.Barrier(nparties)
ev_exec_shutdown = threading.Event()
lprocs = [
(self._write_msg_thread, write_threads, (q, nb_msgs_w, res_puts,
when_exec_shutdown, ev_exec_shutdown,
barrier_start)),
(self._read_msg_thread, read_threads, (q, res_gets, barrier_start)),
(self._shutdown_thread, 1, (q, res_shutdown, ev_exec_shutdown, immediate)),
]
if immediate:
lprocs.append((self._join_thread, join_threads, (q, barrier_start)))
# start all threads.
for func, n, args in lprocs:
for i in range(n):
ps.append(threading.Thread(target=func, args=args))
ps[-1].start()
# set event in order to run q.shutdown()
ev_start.set()

if not immediate:
assert(len(res_gets) == len(res_puts))
assert(res_gets.count(True) == res_puts.count(True))
else:
assert(len(res_gets) <= len(res_puts))
assert(res_gets.count(True) <= res_puts.count(True))

for thread in ps[1:]:
for thread in ps:
thread.join()

@unittest.skip("test times out (gh-115258)")
self.assertTrue(True in res_puts)
self.assertLessEqual(res_gets.count(True), read_threads)
if immediate:
self.assertListEqual(res_shutdown, [True])
self.assertTrue(q.empty())

def test_shutdown_all_methods_in_many_threads(self):
return self._shutdown_all_methods_in_many_threads(False)

@unittest.skip("test times out (gh-115258)")
def test_shutdown_immediate_all_methods_in_many_threads(self):
return self._shutdown_all_methods_in_many_threads(True)

Expand Down
YvesDup marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a hang test in ``test_queue.GenericQueueTest.test_shutdown_[immediate_]all_methods_in_many_threads`` unittests. This unit test is dedicated to the new **shutdown** feature in the threading queue.
YvesDup marked this conversation as resolved.
Show resolved Hide resolved
Loading