Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gh-115258: Fix failed tests on threading queue shutdown #115940

Merged
merged 9 commits into from
Mar 18, 2024
132 changes: 71 additions & 61 deletions Lib/test/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,97 +317,107 @@ def test_shutdown_all_methods_in_one_thread(self):
def test_shutdown_immediate_all_methods_in_one_thread(self):
return self._shutdown_all_methods_in_one_thread(True)

def _write_msg_thread(self, q, n, results, delay,
i_when_exec_shutdown,
event_start, event_end):
event_start.wait()
for i in range(1, n+1):
def _write_msg_thread(self, q, n, results,
i_when_exec_shutdown, event_shutdown,
barrier_start):
# All `write_msg_threads`
# put several items into the queue.
for i in range(0, i_when_exec_shutdown//2):
q.put((i, 'LOYD'))
# Wait for the barrier to be complete.
barrier_start.wait()

for i in range(i_when_exec_shutdown//2, n):
try:
q.put((i, "YDLO"))
results.append(True)
except self.queue.ShutDown:
results.append(False)
# triggers shutdown of queue
if i == i_when_exec_shutdown:
event_end.set()
time.sleep(delay)
# end of all puts
q.join()
break

def _read_msg_thread(self, q, nb, results, delay, event_start):
event_start.wait()
block = True
while nb:
time.sleep(delay)
# Trigger queue shutdown.
if i == i_when_exec_shutdown:
# Only one thread should call shutdown().
if not event_shutdown.is_set():
YvesDup marked this conversation as resolved.
Show resolved Hide resolved
event_shutdown.set()
results.append(True)

def _read_msg_thread(self, q, results, barrier_start):
# Get at least one item.
q.get(True)
q.task_done()
# Wait for the barrier to be complete.
barrier_start.wait()
while True:
try:
# Get at least one message
q.get(block)
block = False
q.get(False)
q.task_done()
results.append(True)
nb -= 1
except self.queue.ShutDown:
results.append(False)
nb -= 1
results.append(True)
break
except self.queue.Empty:
pass
q.join()

def _shutdown_thread(self, q, event_end, immediate):
def _shutdown_thread(self, q, results, event_end, immediate):
event_end.wait()
q.shutdown(immediate)
q.join()
results.append(q.qsize() == 0)

def _join_thread(self, q, delay, event_start):
event_start.wait()
time.sleep(delay)
def _join_thread(self, q, barrier_start):
# Wait for the barrier to be complete.
barrier_start.wait()
q.join()

def _shutdown_all_methods_in_many_threads(self, immediate):
# Run a 'multi-producers/consumers queue' use case,
# with enough items into the queue.
# When shutdown, all running threads will be joined.
q = self.type2test()
ps = []
ev_start = threading.Event()
ev_exec_shutdown = threading.Event()
res_puts = []
res_gets = []
delay = 1e-4
read_process = 4
nb_msgs = read_process * 16
nb_msgs_r = nb_msgs // read_process
when_exec_shutdown = nb_msgs // 2
lprocs = (
(self._write_msg_thread, 1, (q, nb_msgs, res_puts, delay,
when_exec_shutdown,
ev_start, ev_exec_shutdown)),
(self._read_msg_thread, read_process, (q, nb_msgs_r,
res_gets, delay*2,
ev_start)),
(self._join_thread, 2, (q, delay*2, ev_start)),
(self._shutdown_thread, 1, (q, ev_exec_shutdown, immediate)),
)
# start all threds
res_shutdown = []
write_threads = 4
read_threads = 6
join_threads = 2
nb_msgs = 1024*64
nb_msgs_w = nb_msgs // write_threads
when_exec_shutdown = nb_msgs_w // 2
# Use of a Barrier to ensure that
# - all write threads put all their items into the queue,
# - all read thread get at least one item from the queue,
# and keep on running until shutdown.
# The join thread is started only when shutdown is immediate.
nparties = write_threads + read_threads
if immediate:
nparties += join_threads
barrier_start = threading.Barrier(nparties)
ev_exec_shutdown = threading.Event()
lprocs = [
(self._write_msg_thread, write_threads, (q, nb_msgs_w, res_puts,
when_exec_shutdown, ev_exec_shutdown,
barrier_start)),
(self._read_msg_thread, read_threads, (q, res_gets, barrier_start)),
(self._shutdown_thread, 1, (q, res_shutdown, ev_exec_shutdown, immediate)),
]
if immediate:
lprocs.append((self._join_thread, join_threads, (q, barrier_start)))
# start all threads.
for func, n, args in lprocs:
for i in range(n):
ps.append(threading.Thread(target=func, args=args))
ps[-1].start()
# set event in order to run q.shutdown()
ev_start.set()

if not immediate:
assert(len(res_gets) == len(res_puts))
assert(res_gets.count(True) == res_puts.count(True))
else:
assert(len(res_gets) <= len(res_puts))
assert(res_gets.count(True) <= res_puts.count(True))

for thread in ps[1:]:
for thread in ps:
thread.join()

@unittest.skip("test times out (gh-115258)")
self.assertTrue(True in res_puts)
self.assertEqual(res_gets.count(True), read_threads)
if immediate:
self.assertListEqual(res_shutdown, [True])
self.assertTrue(q.empty())

def test_shutdown_all_methods_in_many_threads(self):
return self._shutdown_all_methods_in_many_threads(False)

@unittest.skip("test times out (gh-115258)")
def test_shutdown_immediate_all_methods_in_many_threads(self):
return self._shutdown_all_methods_in_many_threads(True)

Expand Down
Loading